diff --git a/src/test/rbd_mirror/image_replayer/test_mock_BootstrapRequest.cc b/src/test/rbd_mirror/image_replayer/test_mock_BootstrapRequest.cc index 83692986b1bb5e..19580620eccac1 100644 --- a/src/test/rbd_mirror/image_replayer/test_mock_BootstrapRequest.cc +++ b/src/test/rbd_mirror/image_replayer/test_mock_BootstrapRequest.cc @@ -3,7 +3,7 @@ #include "test/rbd_mirror/test_mock_fixture.h" #include "librbd/journal/TypeTraits.h" -#include "tools/rbd_mirror/ImageSyncThrottler.h" +#include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/Threads.h" #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h" #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" @@ -44,16 +44,42 @@ namespace mirror { class ProgressContext; template<> -struct ImageSyncThrottler { - MOCK_METHOD10(start_sync, void(librbd::MockTestImageCtx *local_image_ctx, - librbd::MockTestImageCtx *remote_image_ctx, - SafeTimer *timer, Mutex *timer_lock, - const std::string &mirror_uuid, - ::journal::MockJournaler *journaler, - librbd::journal::MirrorPeerClientMeta *client_meta, - ContextWQ *work_queue, Context *on_finish, - ProgressContext *progress_ctx)); - MOCK_METHOD1(cancel_sync, void(const std::string& mirror_uuid)); +struct ImageSync { + static ImageSync* s_instance; + Context *on_finish = nullptr; + + static ImageSync* create( + librbd::MockTestImageCtx *local_image_ctx, + librbd::MockTestImageCtx *remote_image_ctx, + SafeTimer *timer, Mutex *timer_lock, const std::string &mirror_uuid, + ::journal::MockJournaler *journaler, + librbd::journal::MirrorPeerClientMeta *client_meta, ContextWQ *work_queue, + InstanceWatcher *instance_watcher, + Context *on_finish, ProgressContext *progress_ctx) { + assert(s_instance != nullptr); + s_instance->on_finish = on_finish; + return s_instance; + } + + ImageSync() { + assert(s_instance == nullptr); + s_instance = this; + } + ~ImageSync() { + s_instance = nullptr; + } + + MOCK_METHOD0(get, void()); + MOCK_METHOD0(put, void()); + MOCK_METHOD0(send, void()); + MOCK_METHOD0(cancel, void()); +}; + +ImageSync* + ImageSync::s_instance = nullptr; + +template<> +struct InstanceWatcher { }; namespace image_replayer { @@ -240,10 +266,11 @@ MATCHER_P(IsSameIoCtx, io_ctx, "") { class TestMockImageReplayerBootstrapRequest : public TestMockFixture { public: - typedef ImageSyncThrottlerRef MockImageSyncThrottler; typedef BootstrapRequest MockBootstrapRequest; typedef CloseImageRequest MockCloseImageRequest; typedef CreateImageRequest MockCreateImageRequest; + typedef ImageSync MockImageSync; + typedef InstanceWatcher MockInstanceWatcher; typedef IsPrimaryRequest MockIsPrimaryRequest; typedef OpenImageRequest MockOpenImageRequest; typedef OpenLocalImageRequest MockOpenLocalImageRequest; @@ -380,14 +407,13 @@ class TestMockImageReplayerBootstrapRequest : public TestMockFixture { })); } - void expect_image_sync(MockImageSyncThrottler image_sync_throttler, - int r) { - EXPECT_CALL(*image_sync_throttler, start_sync(_, _, _, _, - StrEq("local mirror uuid"), - _, _, _, _, _)) - .WillOnce(WithArg<8>(Invoke([this, r](Context *on_finish) { - m_threads->work_queue->queue(on_finish, r); - }))); + void expect_image_sync(MockImageSync &mock_image_sync, int r) { + EXPECT_CALL(mock_image_sync, get()); + EXPECT_CALL(mock_image_sync, send()) + .WillOnce(Invoke([this, &mock_image_sync, r]() { + m_threads->work_queue->queue(mock_image_sync.on_finish, r); + })); + EXPECT_CALL(mock_image_sync, put()); } bufferlist encode_tag_data(const librbd::journal::TagData &tag_data) { @@ -396,7 +422,7 @@ class TestMockImageReplayerBootstrapRequest : public TestMockFixture { return bl; } - MockBootstrapRequest *create_request(MockImageSyncThrottler mock_image_sync_throttler, + MockBootstrapRequest *create_request(MockInstanceWatcher *mock_instance_watcher, ::journal::MockJournaler &mock_journaler, const std::string &local_image_id, const std::string &remote_image_id, @@ -406,7 +432,7 @@ class TestMockImageReplayerBootstrapRequest : public TestMockFixture { Context *on_finish) { return new MockBootstrapRequest(m_local_io_ctx, m_remote_io_ctx, - mock_image_sync_throttler, + mock_instance_watcher, &m_local_test_image_ctx, local_image_id, remote_image_id, @@ -472,10 +498,9 @@ TEST_F(TestMockImageReplayerBootstrapRequest, NonPrimaryRemoteSyncingState) { expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0); C_SaferCond ctx; - MockImageSyncThrottler mock_image_sync_throttler( - new ImageSyncThrottler()); + MockInstanceWatcher mock_instance_watcher; MockBootstrapRequest *request = create_request( - mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id, + &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id, mock_remote_image_ctx.id, "global image id", "local mirror uuid", "remote mirror uuid", &ctx); request->send(); @@ -547,10 +572,9 @@ TEST_F(TestMockImageReplayerBootstrapRequest, RemoteDemotePromote) { expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0); C_SaferCond ctx; - MockImageSyncThrottler mock_image_sync_throttler( - new ImageSyncThrottler()); + MockInstanceWatcher mock_instance_watcher; MockBootstrapRequest *request = create_request( - mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id, + &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id, mock_remote_image_ctx.id, "global image id", "local mirror uuid", "remote mirror uuid", &ctx); request->send(); @@ -632,10 +656,9 @@ TEST_F(TestMockImageReplayerBootstrapRequest, MultipleRemoteDemotePromotes) { expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0); C_SaferCond ctx; - MockImageSyncThrottler mock_image_sync_throttler( - new ImageSyncThrottler()); + MockInstanceWatcher mock_instance_watcher; MockBootstrapRequest *request = create_request( - mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id, + &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id, mock_remote_image_ctx.id, "global image id", "local mirror uuid", "remote mirror uuid", &ctx); request->send(); @@ -705,10 +728,9 @@ TEST_F(TestMockImageReplayerBootstrapRequest, LocalDemoteRemotePromote) { expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0); C_SaferCond ctx; - MockImageSyncThrottler mock_image_sync_throttler( - new ImageSyncThrottler()); + MockInstanceWatcher mock_instance_watcher; MockBootstrapRequest *request = create_request( - mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id, + &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id, mock_remote_image_ctx.id, "global image id", "local mirror uuid", "remote mirror uuid", &ctx); request->send(); @@ -777,10 +799,9 @@ TEST_F(TestMockImageReplayerBootstrapRequest, SplitBrainForcePromote) { expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0); C_SaferCond ctx; - MockImageSyncThrottler mock_image_sync_throttler( - new ImageSyncThrottler()); + MockInstanceWatcher mock_instance_watcher; MockBootstrapRequest *request = create_request( - mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id, + &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id, mock_remote_image_ctx.id, "global image id", "local mirror uuid", "remote mirror uuid", &ctx); request->send(); @@ -837,10 +858,9 @@ TEST_F(TestMockImageReplayerBootstrapRequest, ResyncRequested) { expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0); C_SaferCond ctx; - MockImageSyncThrottler mock_image_sync_throttler( - new ImageSyncThrottler()); + MockInstanceWatcher mock_instance_watcher; MockBootstrapRequest *request = create_request( - mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id, + &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id, mock_remote_image_ctx.id, "global image id", "local mirror uuid", "remote mirror uuid", &ctx); m_do_resync = false; @@ -905,16 +925,16 @@ TEST_F(TestMockImageReplayerBootstrapRequest, PrimaryRemote) { expect_journaler_update_client(mock_journaler, client_data, 0); // sync the remote image to the local image - MockImageSyncThrottler mock_image_sync_throttler( - new ImageSyncThrottler()); - expect_image_sync(mock_image_sync_throttler, 0); + MockImageSync mock_image_sync; + expect_image_sync(mock_image_sync, 0); MockCloseImageRequest mock_close_image_request; expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0); C_SaferCond ctx; + MockInstanceWatcher mock_instance_watcher; MockBootstrapRequest *request = create_request( - mock_image_sync_throttler, mock_journaler, "", + &mock_instance_watcher, mock_journaler, "", mock_remote_image_ctx.id, "global image id", "local mirror uuid", "remote mirror uuid", &ctx); request->send(); @@ -981,16 +1001,16 @@ TEST_F(TestMockImageReplayerBootstrapRequest, PrimaryRemoteLocalDeleted) { expect_journaler_update_client(mock_journaler, client_data, 0); // sync the remote image to the local image - MockImageSyncThrottler mock_image_sync_throttler( - new ImageSyncThrottler()); - expect_image_sync(mock_image_sync_throttler, 0); + MockImageSync mock_image_sync; + expect_image_sync(mock_image_sync, 0); MockCloseImageRequest mock_close_image_request; expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0); C_SaferCond ctx; + MockInstanceWatcher mock_instance_watcher; MockBootstrapRequest *request = create_request( - mock_image_sync_throttler, mock_journaler, "", + &mock_instance_watcher, mock_journaler, "", mock_remote_image_ctx.id, "global image id", "local mirror uuid", "remote mirror uuid", &ctx); request->send(); diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index 9ca82925322adb..3b66c20639c0d9 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -36,7 +36,7 @@ #include "librbd/io/ReadResult.h" #include "tools/rbd_mirror/types.h" #include "tools/rbd_mirror/ImageReplayer.h" -#include "tools/rbd_mirror/ImageSyncThrottler.h" +#include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/Threads.h" #include "tools/rbd_mirror/ImageDeleter.h" @@ -118,14 +118,19 @@ class TestImageReplayer : public ::rbd::mirror::TestFixture { m_image_deleter.reset(new rbd::mirror::ImageDeleter(m_threads->work_queue, m_threads->timer, &m_threads->timer_lock)); - m_image_sync_throttler.reset(new rbd::mirror::ImageSyncThrottler<>()); + m_instance_watcher = rbd::mirror::InstanceWatcher<>::create( + m_local_ioctx, m_threads->work_queue, nullptr); + m_instance_watcher->handle_acquire_leader(); } - ~TestImageReplayer() override + ~TestImageReplayer() override { unwatch(); + m_instance_watcher->handle_release_leader(); + delete m_replayer; + delete m_instance_watcher; delete m_threads; EXPECT_EQ(0, m_remote_cluster.pool_delete(m_remote_pool_name.c_str())); @@ -134,9 +139,10 @@ class TestImageReplayer : public ::rbd::mirror::TestFixture { template > void create_replayer() { - m_replayer = new ImageReplayerT(m_threads, m_image_deleter, m_image_sync_throttler, - rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)), - m_local_mirror_uuid, m_local_ioctx.get_id(), "global image id"); + m_replayer = new ImageReplayerT( + m_threads, m_image_deleter, m_instance_watcher, + rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)), + m_local_mirror_uuid, m_local_ioctx.get_id(), "global image id"); m_replayer->add_remote_image(m_remote_mirror_uuid, m_remote_image_id, m_remote_ioctx); } @@ -364,7 +370,7 @@ class TestImageReplayer : public ::rbd::mirror::TestFixture { std::shared_ptr m_image_deleter; std::shared_ptr m_local_cluster; librados::Rados m_remote_cluster; - std::shared_ptr> m_image_sync_throttler; + rbd::mirror::InstanceWatcher<> *m_instance_watcher; std::string m_local_mirror_uuid = "local mirror uuid"; std::string m_remote_mirror_uuid = "remote mirror uuid"; std::string m_local_pool_name, m_remote_pool_name; diff --git a/src/test/rbd_mirror/test_ImageSync.cc b/src/test/rbd_mirror/test_ImageSync.cc index cd50af8ded533f..8097c6e08f3fd7 100644 --- a/src/test/rbd_mirror/test_ImageSync.cc +++ b/src/test/rbd_mirror/test_ImageSync.cc @@ -15,6 +15,7 @@ #include "librbd/io/ReadResult.h" #include "librbd/journal/Types.h" #include "tools/rbd_mirror/ImageSync.h" +#include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/Threads.h" void register_test_image_sync() { @@ -55,6 +56,10 @@ class TestImageSync : public TestFixture { create_and_open(m_local_io_ctx, &m_local_image_ctx); create_and_open(m_remote_io_ctx, &m_remote_image_ctx); + m_instance_watcher = rbd::mirror::InstanceWatcher<>::create( + m_local_io_ctx, m_threads->work_queue, nullptr); + m_instance_watcher->handle_acquire_leader(); + m_remote_journaler = new ::journal::Journaler( m_threads->work_queue, m_threads->timer, &m_threads->timer_lock, m_remote_io_ctx, m_remote_image_ctx->id, "mirror-uuid", {}); @@ -70,7 +75,11 @@ class TestImageSync : public TestFixture { void TearDown() override { TestFixture::TearDown(); + + m_instance_watcher->handle_release_leader(); + delete m_remote_journaler; + delete m_instance_watcher; } void create_and_open(librados::IoCtx &io_ctx, librbd::ImageCtx **image_ctx) { @@ -91,11 +100,12 @@ class TestImageSync : public TestFixture { return new ImageSync<>(m_local_image_ctx, m_remote_image_ctx, m_threads->timer, &m_threads->timer_lock, "mirror-uuid", m_remote_journaler, &m_client_meta, - m_threads->work_queue, ctx); + m_threads->work_queue, m_instance_watcher, ctx); } librbd::ImageCtx *m_remote_image_ctx; librbd::ImageCtx *m_local_image_ctx; + rbd::mirror::InstanceWatcher<> *m_instance_watcher; ::journal::Journaler *m_remote_journaler; librbd::journal::MirrorPeerClientMeta m_client_meta; }; diff --git a/src/test/rbd_mirror/test_LeaderWatcher.cc b/src/test/rbd_mirror/test_LeaderWatcher.cc index 7f248aa1e8e914..c1699892a731e1 100644 --- a/src/test/rbd_mirror/test_LeaderWatcher.cc +++ b/src/test/rbd_mirror/test_LeaderWatcher.cc @@ -71,6 +71,9 @@ class TestLeaderWatcher : public ::rbd::mirror::TestFixture { } } + void update_leader_handler(const std::string &leader_instance_id) override { + } + private: mutable Mutex m_test_lock; int m_acquire_count = 0; diff --git a/src/test/rbd_mirror/test_mock_ImageReplayer.cc b/src/test/rbd_mirror/test_mock_ImageReplayer.cc index e9dfde936b65b0..3ffe83784aba5a 100644 --- a/src/test/rbd_mirror/test_mock_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_mock_ImageReplayer.cc @@ -5,11 +5,11 @@ #include "librbd/journal/Replay.h" #include "librbd/journal/Types.h" #include "tools/rbd_mirror/ImageReplayer.h" +#include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h" #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h" #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h" -#include "tools/rbd_mirror/ImageSyncThrottler.h" #include "test/rbd_mirror/test_mock_fixture.h" #include "test/journal/mock/MockJournaler.h" #include "test/librbd/mock/MockImageCtx.h" @@ -61,7 +61,7 @@ namespace rbd { namespace mirror { template<> -class ImageSyncThrottler { +class InstanceWatcher { }; namespace image_replayer { @@ -110,22 +110,18 @@ struct BootstrapRequest { Context *on_finish = nullptr; bool *do_resync = nullptr; - static BootstrapRequest* create(librados::IoCtx &local_io_ctx, - librados::IoCtx &remote_io_ctx, - rbd::mirror::ImageSyncThrottlerRef image_sync_throttler, - librbd::MockTestImageCtx **local_image_ctx, - const std::string &local_image_name, - const std::string &remote_image_id, - const std::string &global_image_id, - ContextWQ *work_queue, SafeTimer *timer, - Mutex *timer_lock, - const std::string &local_mirror_uuid, - const std::string &remote_mirror_uuid, - ::journal::MockJournalerProxy *journaler, - librbd::journal::MirrorPeerClientMeta *client_meta, - Context *on_finish, - bool *do_resync, - rbd::mirror::ProgressContext *progress_ctx = nullptr) { + static BootstrapRequest* create( + librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx, + rbd::mirror::InstanceWatcher *instance_watcher, + librbd::MockTestImageCtx **local_image_ctx, + const std::string &local_image_name, const std::string &remote_image_id, + const std::string &global_image_id, ContextWQ *work_queue, + SafeTimer *timer, Mutex *timer_lock, const std::string &local_mirror_uuid, + const std::string &remote_mirror_uuid, + ::journal::MockJournalerProxy *journaler, + librbd::journal::MirrorPeerClientMeta *client_meta, + Context *on_finish, bool *do_resync, + rbd::mirror::ProgressContext *progress_ctx = nullptr) { assert(s_instance != nullptr); s_instance->image_ctx = local_image_ctx; s_instance->on_finish = on_finish; @@ -261,6 +257,7 @@ class TestMockImageReplayer : public TestMockFixture { typedef ReplayStatusFormatter MockReplayStatusFormatter; typedef librbd::journal::Replay MockReplay; typedef ImageReplayer MockImageReplayer; + typedef InstanceWatcher MockInstanceWatcher; void SetUp() override { TestMockFixture::SetUp(); @@ -272,11 +269,8 @@ class TestMockImageReplayer : public TestMockFixture { m_image_deleter.reset(new rbd::mirror::ImageDeleter(m_threads->work_queue, m_threads->timer, &m_threads->timer_lock)); - m_image_sync_throttler.reset( - new rbd::mirror::ImageSyncThrottler()); - m_image_replayer = new MockImageReplayer( - m_threads, m_image_deleter, m_image_sync_throttler, + m_threads, m_image_deleter, &m_instance_watcher, rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)), "local_mirror_uuid", m_local_io_ctx.get_id(), "global image id"); m_image_replayer->add_remote_image( @@ -451,7 +445,7 @@ class TestMockImageReplayer : public TestMockFixture { librbd::ImageCtx *m_remote_image_ctx; librbd::ImageCtx *m_local_image_ctx = nullptr; std::shared_ptr m_image_deleter; - std::shared_ptr> m_image_sync_throttler; + MockInstanceWatcher m_instance_watcher; MockImageReplayer *m_image_replayer; }; diff --git a/src/test/rbd_mirror/test_mock_ImageSync.cc b/src/test/rbd_mirror/test_mock_ImageSync.cc index 676615d870c7cf..622cd60db5dad0 100644 --- a/src/test/rbd_mirror/test_mock_ImageSync.cc +++ b/src/test/rbd_mirror/test_mock_ImageSync.cc @@ -45,6 +45,13 @@ template class rbd::mirror::ImageSync; namespace rbd { namespace mirror { +template<> +struct InstanceWatcher { + MOCK_METHOD2(notify_sync_request, void(const std::string, Context *)); + MOCK_METHOD1(cancel_sync_request, bool(const std::string &)); + MOCK_METHOD1(notify_sync_complete, void(const std::string &)); +}; + namespace image_sync { template <> @@ -175,6 +182,7 @@ using ::testing::InvokeWithoutArgs; class TestMockImageSync : public TestMockFixture { public: typedef ImageSync MockImageSync; + typedef InstanceWatcher MockInstanceWatcher; typedef image_sync::ImageCopyRequest MockImageCopyRequest; typedef image_sync::SnapshotCopyRequest MockSnapshotCopyRequest; typedef image_sync::SyncPointCreateRequest MockSyncPointCreateRequest; @@ -191,6 +199,25 @@ class TestMockImageSync : public TestMockFixture { ASSERT_EQ(0, open_image(m_local_io_ctx, m_image_name, &m_local_image_ctx)); } + void expect_notify_sync_request(MockInstanceWatcher &mock_instance_watcher, + const std::string &sync_id, int r) { + EXPECT_CALL(mock_instance_watcher, notify_sync_request(sync_id, _)) + .WillOnce(Invoke([this, r](const std::string &, Context *on_sync_start) { + m_threads->work_queue->queue(on_sync_start, r); + })); + } + + void expect_cancel_sync_request(MockInstanceWatcher &mock_instance_watcher, + const std::string &sync_id, bool canceled) { + EXPECT_CALL(mock_instance_watcher, cancel_sync_request(sync_id)) + .WillOnce(Return(canceled)); + } + + void expect_notify_sync_complete(MockInstanceWatcher &mock_instance_watcher, + const std::string &sync_id) { + EXPECT_CALL(mock_instance_watcher, notify_sync_complete(sync_id)); + } + void expect_create_sync_point(librbd::MockTestImageCtx &mock_local_image_ctx, MockSyncPointCreateRequest &mock_sync_point_create_request, int r) { @@ -265,11 +292,13 @@ class TestMockImageSync : public TestMockFixture { MockImageSync *create_request(librbd::MockTestImageCtx &mock_remote_image_ctx, librbd::MockTestImageCtx &mock_local_image_ctx, journal::MockJournaler &mock_journaler, + MockInstanceWatcher &mock_instance_watcher, Context *ctx) { return new MockImageSync(&mock_local_image_ctx, &mock_remote_image_ctx, m_threads->timer, &m_threads->timer_lock, "mirror-uuid", &mock_journaler, &m_client_meta, - m_threads->work_queue, ctx); + m_threads->work_queue, &mock_instance_watcher, + ctx); } librbd::ImageCtx *m_remote_image_ctx; @@ -281,6 +310,7 @@ TEST_F(TestMockImageSync, SimpleSync) { librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx); librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx); journal::MockJournaler mock_journaler; + MockInstanceWatcher mock_instance_watcher; MockImageCopyRequest mock_image_copy_request; MockSnapshotCopyRequest mock_snapshot_copy_request; MockSyncPointCreateRequest mock_sync_point_create_request; @@ -291,6 +321,7 @@ TEST_F(TestMockImageSync, SimpleSync) { expect_test_features(mock_local_image_ctx); InSequence seq; + expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0); expect_create_sync_point(mock_local_image_ctx, mock_sync_point_create_request, 0); expect_copy_snapshots(mock_snapshot_copy_request, 0); expect_copy_image(mock_image_copy_request, 0); @@ -298,11 +329,12 @@ TEST_F(TestMockImageSync, SimpleSync) { expect_create_object_map(mock_local_image_ctx, mock_object_map); expect_open_object_map(mock_local_image_ctx, *mock_object_map); expect_prune_sync_point(mock_sync_point_prune_request, true, 0); + expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id); C_SaferCond ctx; MockImageSync *request = create_request(mock_remote_image_ctx, - mock_local_image_ctx, - mock_journaler, &ctx); + mock_local_image_ctx, mock_journaler, + mock_instance_watcher, &ctx); request->send(); ASSERT_EQ(0, ctx.wait()); } @@ -311,6 +343,7 @@ TEST_F(TestMockImageSync, RestartSync) { librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx); librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx); journal::MockJournaler mock_journaler; + MockInstanceWatcher mock_instance_watcher; MockImageCopyRequest mock_image_copy_request; MockSnapshotCopyRequest mock_snapshot_copy_request; MockSyncPointCreateRequest mock_sync_point_create_request; @@ -326,6 +359,7 @@ TEST_F(TestMockImageSync, RestartSync) { expect_test_features(mock_local_image_ctx); InSequence seq; + expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0); expect_prune_sync_point(mock_sync_point_prune_request, false, 0); expect_copy_snapshots(mock_snapshot_copy_request, 0); expect_copy_image(mock_image_copy_request, 0); @@ -333,19 +367,60 @@ TEST_F(TestMockImageSync, RestartSync) { expect_create_object_map(mock_local_image_ctx, mock_object_map); expect_open_object_map(mock_local_image_ctx, *mock_object_map); expect_prune_sync_point(mock_sync_point_prune_request, true, 0); + expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id); C_SaferCond ctx; MockImageSync *request = create_request(mock_remote_image_ctx, - mock_local_image_ctx, - mock_journaler, &ctx); + mock_local_image_ctx, mock_journaler, + mock_instance_watcher, &ctx); request->send(); ASSERT_EQ(0, ctx.wait()); } +TEST_F(TestMockImageSync, CancelNotifySyncRequest) { + librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx); + librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx); + journal::MockJournaler mock_journaler; + MockInstanceWatcher mock_instance_watcher; + + InSequence seq; + Context *on_sync_start = nullptr; + C_SaferCond notify_sync_ctx; + EXPECT_CALL(mock_instance_watcher, + notify_sync_request(mock_local_image_ctx.id, _)) + .WillOnce(Invoke([this, &on_sync_start, ¬ify_sync_ctx]( + const std::string &, Context *ctx) { + on_sync_start = ctx; + notify_sync_ctx.complete(0); + })); + EXPECT_CALL(mock_instance_watcher, + cancel_sync_request(mock_local_image_ctx.id)) + .WillOnce(Invoke([this, &on_sync_start](const std::string &) { + EXPECT_NE(nullptr, on_sync_start); + on_sync_start->complete(-ECANCELED); + return true; + })); + + C_SaferCond ctx; + MockImageSync *request = create_request(mock_remote_image_ctx, + mock_local_image_ctx, mock_journaler, + mock_instance_watcher, &ctx); + request->get(); + request->send(); + + // cancel the notify sync request once it starts + ASSERT_EQ(0, notify_sync_ctx.wait()); + request->cancel(); + request->put(); + + ASSERT_EQ(-ECANCELED, ctx.wait()); +} + TEST_F(TestMockImageSync, CancelImageCopy) { librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx); librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx); journal::MockJournaler mock_journaler; + MockInstanceWatcher mock_instance_watcher; MockImageCopyRequest mock_image_copy_request; MockSnapshotCopyRequest mock_snapshot_copy_request; MockSyncPointCreateRequest mock_sync_point_create_request; @@ -354,6 +429,7 @@ TEST_F(TestMockImageSync, CancelImageCopy) { m_client_meta.sync_points = {{cls::rbd::UserSnapshotNamespace(), "snap1", boost::none}}; InSequence seq; + expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0); expect_prune_sync_point(mock_sync_point_prune_request, false, 0); expect_copy_snapshots(mock_snapshot_copy_request, 0); @@ -362,12 +438,15 @@ TEST_F(TestMockImageSync, CancelImageCopy) { .WillOnce(Invoke([&image_copy_ctx]() { image_copy_ctx.complete(0); })); + expect_cancel_sync_request(mock_instance_watcher, mock_local_image_ctx.id, + false); EXPECT_CALL(mock_image_copy_request, cancel()); + expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id); C_SaferCond ctx; MockImageSync *request = create_request(mock_remote_image_ctx, - mock_local_image_ctx, - mock_journaler, &ctx); + mock_local_image_ctx, mock_journaler, + mock_instance_watcher, &ctx); request->get(); request->send(); @@ -384,6 +463,7 @@ TEST_F(TestMockImageSync, CancelAfterCopySnapshots) { librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx); librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx); journal::MockJournaler mock_journaler; + MockInstanceWatcher mock_instance_watcher; MockSnapshotCopyRequest mock_snapshot_copy_request; MockSyncPointCreateRequest mock_sync_point_create_request; @@ -393,9 +473,10 @@ TEST_F(TestMockImageSync, CancelAfterCopySnapshots) { C_SaferCond ctx; MockImageSync *request = create_request(mock_remote_image_ctx, - mock_local_image_ctx, - mock_journaler, &ctx); + mock_local_image_ctx, mock_journaler, + mock_instance_watcher, &ctx); InSequence seq; + expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0); expect_create_sync_point(mock_local_image_ctx, mock_sync_point_create_request, 0); EXPECT_CALL(mock_snapshot_copy_request, send()) .WillOnce((DoAll(InvokeWithoutArgs([request]() { @@ -404,7 +485,10 @@ TEST_F(TestMockImageSync, CancelAfterCopySnapshots) { Invoke([this, &mock_snapshot_copy_request]() { m_threads->work_queue->queue(mock_snapshot_copy_request.on_finish, 0); })))); + expect_cancel_sync_request(mock_instance_watcher, mock_local_image_ctx.id, + false); EXPECT_CALL(mock_snapshot_copy_request, cancel()); + expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id); request->send(); ASSERT_EQ(-ECANCELED, ctx.wait()); @@ -414,6 +498,7 @@ TEST_F(TestMockImageSync, CancelAfterCopyImage) { librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx); librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx); journal::MockJournaler mock_journaler; + MockInstanceWatcher mock_instance_watcher; MockImageCopyRequest mock_image_copy_request; MockSnapshotCopyRequest mock_snapshot_copy_request; MockSyncPointCreateRequest mock_sync_point_create_request; @@ -425,9 +510,10 @@ TEST_F(TestMockImageSync, CancelAfterCopyImage) { C_SaferCond ctx; MockImageSync *request = create_request(mock_remote_image_ctx, - mock_local_image_ctx, - mock_journaler, &ctx); + mock_local_image_ctx, mock_journaler, + mock_instance_watcher, &ctx); InSequence seq; + expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0); expect_create_sync_point(mock_local_image_ctx, mock_sync_point_create_request, 0); expect_copy_snapshots(mock_snapshot_copy_request, 0); EXPECT_CALL(mock_image_copy_request, send()) @@ -437,7 +523,10 @@ TEST_F(TestMockImageSync, CancelAfterCopyImage) { Invoke([this, &mock_image_copy_request]() { m_threads->work_queue->queue(mock_image_copy_request.on_finish, 0); })))); + expect_cancel_sync_request(mock_instance_watcher, mock_local_image_ctx.id, + false); EXPECT_CALL(mock_image_copy_request, cancel()); + expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id); request->send(); ASSERT_EQ(-ECANCELED, ctx.wait()); diff --git a/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc b/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc index fe2c11826405e2..24815aeb799aa2 100644 --- a/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc +++ b/src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc @@ -13,12 +13,7 @@ */ #include "test/rbd_mirror/test_mock_fixture.h" -#include "librbd/journal/TypeTraits.h" -#include "test/journal/mock/MockJournaler.h" #include "test/librbd/mock/MockImageCtx.h" -#include "librbd/ImageState.h" -#include "tools/rbd_mirror/Threads.h" -#include "tools/rbd_mirror/ImageSync.h" namespace librbd { @@ -32,73 +27,8 @@ struct MockTestImageCtx : public librbd::MockImageCtx { } // anonymous namespace -namespace journal { - -template <> -struct TypeTraits { - typedef ::journal::MockJournaler Journaler; -}; - -} // namespace journal } // namespace librbd -namespace rbd { -namespace mirror { - -using ::testing::Invoke; - -typedef ImageSync MockImageSync; - -template<> -class ImageSync { -public: - static std::vector instances; - - Context *on_finish; - bool syncing = false; - - static ImageSync* create(librbd::MockTestImageCtx *local_image_ctx, - librbd::MockTestImageCtx *remote_image_ctx, - SafeTimer *timer, Mutex *timer_lock, - const std::string &mirror_uuid, - journal::MockJournaler *journaler, - librbd::journal::MirrorPeerClientMeta *client_meta, - ContextWQ *work_queue, Context *on_finish, - ProgressContext *progress_ctx = nullptr) { - ImageSync *sync = new ImageSync(); - sync->on_finish = on_finish; - - EXPECT_CALL(*sync, send()) - .WillRepeatedly(Invoke([sync]() { - sync->syncing = true; - })); - - return sync; - } - - void finish(int r) { - on_finish->complete(r); - put(); - } - - void get() { - instances.push_back(this); - } - - void put() { delete this; } - - MOCK_METHOD0(cancel, void()); - MOCK_METHOD0(send, void()); - -}; - - -std::vector MockImageSync::instances; - -} // namespace mirror -} // namespace rbd - - // template definitions #include "tools/rbd_mirror/ImageSyncThrottler.cc" @@ -109,300 +39,144 @@ class TestMockImageSyncThrottler : public TestMockFixture { public: typedef ImageSyncThrottler MockImageSyncThrottler; - void SetUp() override { - TestMockFixture::SetUp(); - - librbd::RBD rbd; - ASSERT_EQ(0, create_image(rbd, m_remote_io_ctx, m_image_name, m_image_size)); - ASSERT_EQ(0, open_image(m_remote_io_ctx, m_image_name, &m_remote_image_ctx)); - - ASSERT_EQ(0, create_image(rbd, m_local_io_ctx, m_image_name, m_image_size)); - ASSERT_EQ(0, open_image(m_local_io_ctx, m_image_name, &m_local_image_ctx)); - - mock_sync_throttler = new MockImageSyncThrottler(); - - m_mock_local_image_ctx = new librbd::MockTestImageCtx(*m_local_image_ctx); - m_mock_remote_image_ctx = new librbd::MockTestImageCtx(*m_remote_image_ctx); - m_mock_journaler = new journal::MockJournaler(); - } - - void TearDown() override { - MockImageSync::instances.clear(); - delete mock_sync_throttler; - delete m_mock_local_image_ctx; - delete m_mock_remote_image_ctx; - delete m_mock_journaler; - TestMockFixture::TearDown(); - } - - void start_sync(const std::string& image_id, Context *ctx) { - m_mock_local_image_ctx->id = image_id; - mock_sync_throttler->start_sync(m_mock_local_image_ctx, - m_mock_remote_image_ctx, - m_threads->timer, - &m_threads->timer_lock, - "mirror_uuid", - m_mock_journaler, - &m_client_meta, - m_threads->work_queue, - ctx); - } - - void cancel(const std::string& mirror_uuid, MockImageSync *sync, - bool running=true) { - if (running) { - EXPECT_CALL(*sync, cancel()) - .WillOnce(Invoke([sync]() { - sync->finish(-ECANCELED); - })); - } else { - EXPECT_CALL(*sync, cancel()).Times(0); - } - mock_sync_throttler->cancel_sync(mirror_uuid); - } - - librbd::ImageCtx *m_remote_image_ctx; - librbd::ImageCtx *m_local_image_ctx; - librbd::MockTestImageCtx *m_mock_local_image_ctx; - librbd::MockTestImageCtx *m_mock_remote_image_ctx; - journal::MockJournaler *m_mock_journaler; - librbd::journal::MirrorPeerClientMeta m_client_meta; - MockImageSyncThrottler *mock_sync_throttler; }; TEST_F(TestMockImageSyncThrottler, Single_Sync) { - C_SaferCond ctx; - start_sync("image_id", &ctx); - - ASSERT_EQ(1u, MockImageSync::instances.size()); - MockImageSync *sync = MockImageSync::instances[0]; - ASSERT_EQ(true, sync->syncing); - sync->finish(0); - ASSERT_EQ(0, ctx.wait()); + MockImageSyncThrottler throttler; + C_SaferCond on_start; + throttler.start_op("id", &on_start); + ASSERT_EQ(0, on_start.wait()); + throttler.finish_op("id"); } TEST_F(TestMockImageSyncThrottler, Multiple_Syncs) { - mock_sync_throttler->set_max_concurrent_syncs(2); - - C_SaferCond ctx1; - start_sync("image_id_1", &ctx1); - C_SaferCond ctx2; - start_sync("image_id_2", &ctx2); - C_SaferCond ctx3; - start_sync("image_id_3", &ctx3); - C_SaferCond ctx4; - start_sync("image_id_4", &ctx4); - - ASSERT_EQ(4u, MockImageSync::instances.size()); - - MockImageSync *sync1 = MockImageSync::instances[0]; - ASSERT_TRUE(sync1->syncing); - - MockImageSync *sync2 = MockImageSync::instances[1]; - ASSERT_TRUE(sync2->syncing); - - MockImageSync *sync3 = MockImageSync::instances[2]; - ASSERT_FALSE(sync3->syncing); - - MockImageSync *sync4 = MockImageSync::instances[3]; - ASSERT_FALSE(sync4->syncing); - - sync1->finish(0); - ASSERT_EQ(0, ctx1.wait()); - - ASSERT_TRUE(sync3->syncing); - sync3->finish(-EINVAL); - ASSERT_EQ(-EINVAL, ctx3.wait()); - - ASSERT_TRUE(sync4->syncing); - - sync2->finish(0); - ASSERT_EQ(0, ctx2.wait()); - - sync4->finish(0); - ASSERT_EQ(0, ctx4.wait()); + MockImageSyncThrottler throttler; + throttler.set_max_concurrent_syncs(2); + + C_SaferCond on_start1; + throttler.start_op("id1", &on_start1); + C_SaferCond on_start2; + throttler.start_op("id2", &on_start2); + C_SaferCond on_start3; + throttler.start_op("id3", &on_start3); + C_SaferCond on_start4; + throttler.start_op("id4", &on_start4); + + ASSERT_EQ(0, on_start2.wait()); + throttler.finish_op("id2"); + ASSERT_EQ(0, on_start3.wait()); + throttler.finish_op("id3"); + ASSERT_EQ(0, on_start1.wait()); + throttler.finish_op("id1"); + ASSERT_EQ(0, on_start4.wait()); + throttler.finish_op("id4"); } TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync) { - C_SaferCond ctx1; - start_sync("image_id_1", &ctx1); - C_SaferCond ctx2; - start_sync("image_id_2", &ctx2); - - ASSERT_EQ(2u, MockImageSync::instances.size()); - - MockImageSync *sync1 = MockImageSync::instances[0]; - ASSERT_TRUE(sync1->syncing); - - MockImageSync *sync2 = MockImageSync::instances[1]; - ASSERT_TRUE(sync2->syncing); - - cancel("image_id_2", sync2); - ASSERT_EQ(-ECANCELED, ctx2.wait()); - - sync1->finish(0); - ASSERT_EQ(0, ctx1.wait()); + MockImageSyncThrottler throttler; + C_SaferCond on_start; + throttler.start_op("id", &on_start); + ASSERT_EQ(0, on_start.wait()); + ASSERT_FALSE(throttler.cancel_op("id")); + throttler.finish_op("id"); } TEST_F(TestMockImageSyncThrottler, Cancel_Waiting_Sync) { - mock_sync_throttler->set_max_concurrent_syncs(1); - - C_SaferCond ctx1; - start_sync("image_id_1", &ctx1); - C_SaferCond ctx2; - start_sync("image_id_2", &ctx2); - - ASSERT_EQ(2u, MockImageSync::instances.size()); - - MockImageSync *sync1 = MockImageSync::instances[0]; - ASSERT_TRUE(sync1->syncing); - - MockImageSync *sync2 = MockImageSync::instances[1]; - ASSERT_FALSE(sync2->syncing); - - cancel("image_id_2", sync2, false); - ASSERT_EQ(-ECANCELED, ctx2.wait()); - - sync1->finish(0); - ASSERT_EQ(0, ctx1.wait()); + MockImageSyncThrottler throttler; + throttler.set_max_concurrent_syncs(1); + + C_SaferCond on_start1; + throttler.start_op("id1", &on_start1); + C_SaferCond on_start2; + throttler.start_op("id2", &on_start2); + + ASSERT_EQ(0, on_start1.wait()); + ASSERT_TRUE(throttler.cancel_op("id2")); + ASSERT_EQ(-ECANCELED, on_start2.wait()); + throttler.finish_op("id1"); } -TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) { - mock_sync_throttler->set_max_concurrent_syncs(1); - - C_SaferCond ctx1; - start_sync("image_id_1", &ctx1); - C_SaferCond ctx2; - start_sync("image_id_2", &ctx2); - - ASSERT_EQ(2u, MockImageSync::instances.size()); - MockImageSync *sync1 = MockImageSync::instances[0]; - ASSERT_TRUE(sync1->syncing); - - MockImageSync *sync2 = MockImageSync::instances[1]; - ASSERT_FALSE(sync2->syncing); - - cancel("image_id_1", sync1); - ASSERT_EQ(-ECANCELED, ctx1.wait()); - - ASSERT_TRUE(sync2->syncing); - sync2->finish(0); - ASSERT_EQ(0, ctx2.wait()); +TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) { + MockImageSyncThrottler throttler; + throttler.set_max_concurrent_syncs(1); + + C_SaferCond on_start1; + throttler.start_op("id1", &on_start1); + C_SaferCond on_start2; + throttler.start_op("id2", &on_start2); + + ASSERT_EQ(0, on_start1.wait()); + ASSERT_FALSE(throttler.cancel_op("id1")); + throttler.finish_op("id1"); + ASSERT_EQ(0, on_start2.wait()); + throttler.finish_op("id2"); } TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) { - mock_sync_throttler->set_max_concurrent_syncs(2); - - C_SaferCond ctx1; - start_sync("image_id_1", &ctx1); - C_SaferCond ctx2; - start_sync("image_id_2", &ctx2); - C_SaferCond ctx3; - start_sync("image_id_3", &ctx3); - C_SaferCond ctx4; - start_sync("image_id_4", &ctx4); - C_SaferCond ctx5; - start_sync("image_id_5", &ctx5); - - ASSERT_EQ(5u, MockImageSync::instances.size()); - - MockImageSync *sync1 = MockImageSync::instances[0]; - ASSERT_TRUE(sync1->syncing); - - MockImageSync *sync2 = MockImageSync::instances[1]; - ASSERT_TRUE(sync2->syncing); - - MockImageSync *sync3 = MockImageSync::instances[2]; - ASSERT_FALSE(sync3->syncing); - - MockImageSync *sync4 = MockImageSync::instances[3]; - ASSERT_FALSE(sync4->syncing); - - MockImageSync *sync5 = MockImageSync::instances[4]; - ASSERT_FALSE(sync5->syncing); - - mock_sync_throttler->set_max_concurrent_syncs(4); - - ASSERT_TRUE(sync3->syncing); - ASSERT_TRUE(sync4->syncing); - ASSERT_FALSE(sync5->syncing); - - sync1->finish(0); - ASSERT_EQ(0, ctx1.wait()); - - ASSERT_TRUE(sync5->syncing); - sync5->finish(-EINVAL); - ASSERT_EQ(-EINVAL, ctx5.wait()); - - sync2->finish(0); - ASSERT_EQ(0, ctx2.wait()); - - sync3->finish(0); - ASSERT_EQ(0, ctx3.wait()); - - sync4->finish(0); - ASSERT_EQ(0, ctx4.wait()); + MockImageSyncThrottler throttler; + throttler.set_max_concurrent_syncs(2); + + C_SaferCond on_start1; + throttler.start_op("id1", &on_start1); + C_SaferCond on_start2; + throttler.start_op("id2", &on_start2); + C_SaferCond on_start3; + throttler.start_op("id3", &on_start3); + C_SaferCond on_start4; + throttler.start_op("id4", &on_start4); + C_SaferCond on_start5; + throttler.start_op("id5", &on_start5); + + ASSERT_EQ(0, on_start1.wait()); + ASSERT_EQ(0, on_start2.wait()); + + throttler.set_max_concurrent_syncs(4); + + ASSERT_EQ(0, on_start3.wait()); + ASSERT_EQ(0, on_start4.wait()); + + throttler.finish_op("id4"); + ASSERT_EQ(0, on_start5.wait()); + + throttler.finish_op("id1"); + throttler.finish_op("id2"); + throttler.finish_op("id3"); + throttler.finish_op("id5"); } TEST_F(TestMockImageSyncThrottler, Decrease_Max_Concurrent_Syncs) { - mock_sync_throttler->set_max_concurrent_syncs(4); - - C_SaferCond ctx1; - start_sync("image_id_1", &ctx1); - C_SaferCond ctx2; - start_sync("image_id_2", &ctx2); - C_SaferCond ctx3; - start_sync("image_id_3", &ctx3); - C_SaferCond ctx4; - start_sync("image_id_4", &ctx4); - C_SaferCond ctx5; - start_sync("image_id_5", &ctx5); - - ASSERT_EQ(5u, MockImageSync::instances.size()); - - MockImageSync *sync1 = MockImageSync::instances[0]; - ASSERT_TRUE(sync1->syncing); - - MockImageSync *sync2 = MockImageSync::instances[1]; - ASSERT_TRUE(sync2->syncing); - - MockImageSync *sync3 = MockImageSync::instances[2]; - ASSERT_TRUE(sync3->syncing); - - MockImageSync *sync4 = MockImageSync::instances[3]; - ASSERT_TRUE(sync4->syncing); - - MockImageSync *sync5 = MockImageSync::instances[4]; - ASSERT_FALSE(sync5->syncing); - - mock_sync_throttler->set_max_concurrent_syncs(2); - - ASSERT_FALSE(sync5->syncing); - - sync1->finish(0); - ASSERT_EQ(0, ctx1.wait()); - - ASSERT_FALSE(sync5->syncing); - - sync2->finish(0); - ASSERT_EQ(0, ctx2.wait()); - - ASSERT_FALSE(sync5->syncing); - - sync3->finish(0); - ASSERT_EQ(0, ctx3.wait()); - - ASSERT_TRUE(sync5->syncing); - - sync4->finish(0); - ASSERT_EQ(0, ctx4.wait()); - - sync5->finish(0); - ASSERT_EQ(0, ctx5.wait()); + MockImageSyncThrottler throttler; + throttler.set_max_concurrent_syncs(4); + + C_SaferCond on_start1; + throttler.start_op("id1", &on_start1); + C_SaferCond on_start2; + throttler.start_op("id2", &on_start2); + C_SaferCond on_start3; + throttler.start_op("id3", &on_start3); + C_SaferCond on_start4; + throttler.start_op("id4", &on_start4); + C_SaferCond on_start5; + throttler.start_op("id5", &on_start5); + + ASSERT_EQ(0, on_start1.wait()); + ASSERT_EQ(0, on_start2.wait()); + ASSERT_EQ(0, on_start3.wait()); + ASSERT_EQ(0, on_start4.wait()); + + throttler.set_max_concurrent_syncs(2); + + throttler.finish_op("id1"); + throttler.finish_op("id2"); + throttler.finish_op("id3"); + + ASSERT_EQ(0, on_start5.wait()); + + throttler.finish_op("id4"); + throttler.finish_op("id5"); } - } // namespace mirror } // namespace rbd diff --git a/src/test/rbd_mirror/test_mock_InstanceReplayer.cc b/src/test/rbd_mirror/test_mock_InstanceReplayer.cc index 53a9d0b6edd388..f2a1d1b34f1f82 100644 --- a/src/test/rbd_mirror/test_mock_InstanceReplayer.cc +++ b/src/test/rbd_mirror/test_mock_InstanceReplayer.cc @@ -4,7 +4,7 @@ #include "test/librbd/mock/MockImageCtx.h" #include "test/rbd_mirror/test_mock_fixture.h" #include "tools/rbd_mirror/ImageReplayer.h" -#include "tools/rbd_mirror/ImageSyncThrottler.h" +#include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/InstanceReplayer.h" #include "tools/rbd_mirror/Threads.h" @@ -37,6 +37,10 @@ struct Threads { } }; +template<> +struct InstanceWatcher { +}; + template<> struct ImageReplayer { static ImageReplayer* s_instance; @@ -45,7 +49,7 @@ struct ImageReplayer { static ImageReplayer *create( Threads *threads, std::shared_ptr image_deleter, - ImageSyncThrottlerRef image_sync_throttler, + InstanceWatcher *instance_watcher, RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id, const std::string &global_image_id) { assert(s_instance != nullptr); @@ -83,14 +87,6 @@ struct ImageReplayer { MOCK_METHOD0(is_blacklisted, bool()); }; -template<> -struct ImageSyncThrottler { - ImageSyncThrottler() { - } - virtual ~ImageSyncThrottler() { - } -}; - ImageReplayer* ImageReplayer::s_instance = nullptr; } // namespace mirror @@ -112,6 +108,7 @@ class TestMockInstanceReplayer : public TestMockFixture { public: typedef ImageReplayer MockImageReplayer; typedef InstanceReplayer MockInstanceReplayer; + typedef InstanceWatcher MockInstanceWatcher; typedef Threads MockThreads; void SetUp() override { @@ -122,8 +119,6 @@ class TestMockInstanceReplayer : public TestMockFixture { m_image_deleter.reset( new rbd::mirror::ImageDeleter(m_threads->work_queue, m_threads->timer, &m_threads->timer_lock)); - m_image_sync_throttler.reset( - new rbd::mirror::ImageSyncThrottler()); } void TearDown() override { @@ -133,14 +128,13 @@ class TestMockInstanceReplayer : public TestMockFixture { MockThreads *m_mock_threads; std::shared_ptr m_image_deleter; - std::shared_ptr> - m_image_sync_throttler; }; TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) { + MockInstanceWatcher mock_instance_watcher; MockImageReplayer mock_image_replayer; MockInstanceReplayer instance_replayer( - m_mock_threads, m_image_deleter, m_image_sync_throttler, + m_mock_threads, m_image_deleter, rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)), "local_mirror_uuid", m_local_io_ctx.get_id()); @@ -166,8 +160,9 @@ TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) { .WillOnce(Return(true)); EXPECT_CALL(mock_image_replayer, start(nullptr, false)); - instance_replayer.acquire_image(global_image_id, "remote_mirror_uuid", - "remote_image_id", &on_acquire); + instance_replayer.acquire_image(&mock_instance_watcher, global_image_id, + "remote_mirror_uuid", "remote_image_id", + &on_acquire); ASSERT_EQ(0, on_acquire.wait()); // Release diff --git a/src/test/rbd_mirror/test_mock_InstanceWatcher.cc b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc index 5a94e5a65ab428..bdd56bfc39dfc3 100644 --- a/src/test/rbd_mirror/test_mock_InstanceWatcher.cc +++ b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc @@ -9,6 +9,7 @@ #include "test/librbd/mock/MockImageCtx.h" #include "test/rbd_mirror/test_mock_fixture.h" #include "tools/rbd_mirror/InstanceReplayer.h" +#include "tools/rbd_mirror/ImageSyncThrottler.h" #include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/Threads.h" @@ -75,12 +76,40 @@ struct Threads { template <> struct InstanceReplayer { - MOCK_METHOD4(acquire_image, void(const std::string &, const std::string &, + MOCK_METHOD5(acquire_image, void(InstanceWatcher *, + const std::string &, const std::string &, const std::string &, Context *)); MOCK_METHOD5(release_image, void(const std::string &, const std::string &, const std::string &, bool, Context *)); }; +template <> +struct ImageSyncThrottler { + static ImageSyncThrottler* s_instance; + + static ImageSyncThrottler *create() { + assert(s_instance != nullptr); + return s_instance; + } + + ImageSyncThrottler() { + assert(s_instance == nullptr); + s_instance = this; + } + + virtual ~ImageSyncThrottler() { + assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD0(destroy, void()); + MOCK_METHOD1(drain, void(int)); + MOCK_METHOD2(start_op, void(const std::string &, Context *)); + MOCK_METHOD1(finish_op, void(const std::string &)); +}; + +ImageSyncThrottler* ImageSyncThrottler::s_instance = nullptr; + } // namespace mirror } // namespace rbd @@ -335,16 +364,18 @@ TEST_F(TestMockInstanceWatcher, ImageAcquireRelease) { ASSERT_EQ(0, instance_watcher2->init()); // Acquire Image on the the same instance - EXPECT_CALL(mock_instance_replayer1, acquire_image("gid", "uuid", "id", _)) - .WillOnce(WithArg<3>(CompleteContext(0))); + EXPECT_CALL(mock_instance_replayer1, acquire_image(instance_watcher1, "gid", + "uuid", "id", _)) + .WillOnce(WithArg<4>(CompleteContext(0))); C_SaferCond on_acquire1; instance_watcher1->notify_image_acquire(instance_id1, "gid", "uuid", "id", &on_acquire1); ASSERT_EQ(0, on_acquire1.wait()); // Acquire Image on the other instance - EXPECT_CALL(mock_instance_replayer2, acquire_image("gid", "uuid", "id", _)) - .WillOnce(WithArg<3>(CompleteContext(0))); + EXPECT_CALL(mock_instance_replayer2, acquire_image(instance_watcher2, "gid", + "uuid", "id", _)) + .WillOnce(WithArg<4>(CompleteContext(0))); C_SaferCond on_acquire2; instance_watcher1->notify_image_acquire(instance_id2, "gid", "uuid", "id", &on_acquire2); @@ -455,5 +486,333 @@ TEST_F(TestMockInstanceWatcher, ImageAcquireReleaseCancel) { delete instance_watcher; } +class TestMockInstanceWatcher_NotifySync : public TestMockInstanceWatcher { +public: + typedef ImageSyncThrottler MockImageSyncThrottler; + + MockManagedLock mock_managed_lock; + MockImageSyncThrottler mock_image_sync_throttler; + std::string instance_id1; + std::string instance_id2; + + librados::Rados cluster; + librados::IoCtx io_ctx2; + + MockInstanceWatcher *instance_watcher1; + MockInstanceWatcher *instance_watcher2; + + void SetUp() override { + TestMockInstanceWatcher::SetUp(); + + instance_id1 = m_instance_id; + librados::IoCtx& io_ctx1 = m_local_io_ctx; + librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1)); + instance_watcher1 = MockInstanceWatcher::create(io_ctx1, + m_mock_threads->work_queue, + nullptr); + EXPECT_EQ("", connect_cluster_pp(cluster)); + EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2)); + instance_id2 = stringify(io_ctx2.get_instance_id()); + librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2)); + instance_watcher2 = MockInstanceWatcher::create(io_ctx2, + m_mock_threads->work_queue, + nullptr); + InSequence seq; + + // Init instance watcher 1 (leader) + expect_register_instance(mock_io_ctx1, 0); + expect_register_watch(mock_io_ctx1, instance_id1); + expect_acquire_lock(mock_managed_lock, 0); + EXPECT_EQ(0, instance_watcher1->init()); + instance_watcher1->handle_acquire_leader(); + + // Init instance watcher 2 + expect_register_instance(mock_io_ctx2, 0); + expect_register_watch(mock_io_ctx2, instance_id2); + expect_acquire_lock(mock_managed_lock, 0); + EXPECT_EQ(0, instance_watcher2->init()); + instance_watcher2->handle_update_leader(instance_id1); + } + + void TearDown() override { + librados::IoCtx& io_ctx1 = m_local_io_ctx; + librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1)); + librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2)); + + InSequence seq; + + expect_throttler_destroy(); + instance_watcher1->handle_release_leader(); + + // Shutdown instance watcher 1 + expect_release_lock(mock_managed_lock, 0); + expect_unregister_watch(mock_io_ctx1); + expect_unregister_instance(mock_io_ctx1, 0); + instance_watcher1->shut_down(); + + expect_destroy_lock(mock_managed_lock); + delete instance_watcher1; + + // Shutdown instance watcher 2 + expect_release_lock(mock_managed_lock, 0); + expect_unregister_watch(mock_io_ctx2); + expect_unregister_instance(mock_io_ctx2, 0); + instance_watcher2->shut_down(); + + expect_destroy_lock(mock_managed_lock); + delete instance_watcher2; + + TestMockInstanceWatcher::TearDown(); + } + + void expect_throttler_destroy( + std::vector *throttler_queue = nullptr) { + EXPECT_CALL(mock_image_sync_throttler, drain(-ESTALE)) + .WillOnce(Invoke([throttler_queue] (int r) { + if (throttler_queue != nullptr) { + for (auto ctx : *throttler_queue) { + ctx->complete(r); + } + } + })); + EXPECT_CALL(mock_image_sync_throttler, destroy()); + } + + void expect_throttler_start_op(const std::string &sync_id, + Context *on_call = nullptr, + Context **on_start_ctx = nullptr) { + EXPECT_CALL(mock_image_sync_throttler, start_op(sync_id, _)) + .WillOnce(Invoke([on_call, on_start_ctx] (const std::string &, + Context *ctx) { + if (on_call != nullptr) { + on_call->complete(0); + } + if (on_start_ctx != nullptr) { + *on_start_ctx = ctx; + } else { + ctx->complete(0); + } + })); + } + + void expect_throttler_finish_op(const std::string &sync_id, + Context *on_finish) { + EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id")) + .WillOnce(Invoke([on_finish](const std::string &) { + on_finish->complete(0); + })); + } +}; + +TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnLeader) { + InSequence seq; + + expect_throttler_start_op("sync_id"); + C_SaferCond on_start; + instance_watcher1->notify_sync_request("sync_id", &on_start); + ASSERT_EQ(0, on_start.wait()); + + C_SaferCond on_finish; + expect_throttler_finish_op("sync_id", &on_finish); + instance_watcher1->notify_sync_complete("sync_id"); + ASSERT_EQ(0, on_finish.wait()); +} + +TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnLeader) { + InSequence seq; + + expect_throttler_start_op("sync_id"); + C_SaferCond on_start; + instance_watcher1->notify_sync_request("sync_id", &on_start); + ASSERT_EQ(0, on_start.wait()); + + ASSERT_FALSE(instance_watcher1->cancel_sync_request("sync_id")); + + C_SaferCond on_finish; + expect_throttler_finish_op("sync_id", &on_finish); + instance_watcher1->notify_sync_complete("sync_id"); + ASSERT_EQ(0, on_finish.wait()); +} + +TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnNonLeader) { + InSequence seq; + + expect_throttler_start_op("sync_id"); + C_SaferCond on_start; + instance_watcher2->notify_sync_request("sync_id", &on_start); + ASSERT_EQ(0, on_start.wait()); + + C_SaferCond on_finish; + expect_throttler_finish_op("sync_id", &on_finish); + instance_watcher2->notify_sync_complete("sync_id"); + ASSERT_EQ(0, on_finish.wait()); +} + +TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnNonLeader) { + InSequence seq; + + expect_throttler_start_op("sync_id"); + C_SaferCond on_start; + instance_watcher2->notify_sync_request("sync_id", &on_start); + ASSERT_EQ(0, on_start.wait()); + + ASSERT_FALSE(instance_watcher2->cancel_sync_request("sync_id")); + + C_SaferCond on_finish; + expect_throttler_finish_op("sync_id", &on_finish); + instance_watcher2->notify_sync_complete("sync_id"); + ASSERT_EQ(0, on_finish.wait()); +} + +TEST_F(TestMockInstanceWatcher_NotifySync, CancelWaitingOnNonLeader) { + InSequence seq; + + C_SaferCond on_start_op_called; + Context *on_start_ctx; + expect_throttler_start_op("sync_id", &on_start_op_called, + &on_start_ctx); + C_SaferCond on_start; + instance_watcher2->notify_sync_request("sync_id", &on_start); + ASSERT_EQ(0, on_start_op_called.wait()); + + ASSERT_TRUE(instance_watcher2->cancel_sync_request("sync_id")); + // emulate watcher timeout + on_start_ctx->complete(-ETIMEDOUT); + ASSERT_EQ(-ECANCELED, on_start.wait()); +} + +TEST_F(TestMockInstanceWatcher_NotifySync, InFlightPrevNotification) { + // start sync when previous notification is still in flight + + InSequence seq; + + expect_throttler_start_op("sync_id"); + C_SaferCond on_start1; + instance_watcher2->notify_sync_request("sync_id", &on_start1); + ASSERT_EQ(0, on_start1.wait()); + + C_SaferCond on_start2; + EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id")) + .WillOnce(Invoke([this, &on_start2](const std::string &) { + instance_watcher2->notify_sync_request("sync_id", &on_start2); + })); + expect_throttler_start_op("sync_id"); + instance_watcher2->notify_sync_complete("sync_id"); + + ASSERT_EQ(0, on_start2.wait()); + C_SaferCond on_finish; + expect_throttler_finish_op("sync_id", &on_finish); + instance_watcher2->notify_sync_complete("sync_id"); + ASSERT_EQ(0, on_finish.wait()); +} + +TEST_F(TestMockInstanceWatcher_NotifySync, NoInFlightReleaseAcquireLeader) { + InSequence seq; + + expect_throttler_destroy(); + instance_watcher1->handle_release_leader(); + instance_watcher1->handle_acquire_leader(); +} + +TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnLeaderReleaseLeader) { + InSequence seq; + + expect_throttler_destroy(); + instance_watcher1->handle_release_leader(); + instance_watcher2->handle_acquire_leader(); + + expect_throttler_start_op("sync_id"); + C_SaferCond on_start; + instance_watcher2->notify_sync_request("sync_id", &on_start); + ASSERT_EQ(0, on_start.wait()); + expect_throttler_destroy(); + instance_watcher2->handle_release_leader(); + instance_watcher2->notify_sync_complete("sync_id"); + + instance_watcher1->handle_acquire_leader(); +} + +TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnLeaderReleaseLeader) { + InSequence seq; + + C_SaferCond on_start_op_called; + Context *on_start_ctx; + expect_throttler_start_op("sync_id", &on_start_op_called, + &on_start_ctx); + C_SaferCond on_start; + instance_watcher1->notify_sync_request("sync_id", &on_start); + ASSERT_EQ(0, on_start_op_called.wait()); + + std::vector throttler_queue = {on_start_ctx}; + expect_throttler_destroy(&throttler_queue); + instance_watcher1->handle_release_leader(); + instance_watcher2->handle_acquire_leader(); + instance_watcher1->handle_update_leader(instance_id2); + + expect_throttler_start_op("sync_id"); + ASSERT_EQ(0, on_start.wait()); + C_SaferCond on_finish; + expect_throttler_finish_op("sync_id", &on_finish); + instance_watcher1->notify_sync_complete("sync_id"); + ASSERT_EQ(0, on_finish.wait()); + + expect_throttler_destroy(); + instance_watcher2->handle_release_leader(); + instance_watcher1->handle_acquire_leader(); +} + +TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnNonLeaderAcquireLeader) { + InSequence seq; + + expect_throttler_destroy(); + instance_watcher1->handle_release_leader(); + instance_watcher2->handle_acquire_leader(); + instance_watcher1->handle_update_leader(instance_id2); + + expect_throttler_start_op("sync_id"); + C_SaferCond on_start; + instance_watcher1->notify_sync_request("sync_id", &on_start); + ASSERT_EQ(0, on_start.wait()); + + expect_throttler_destroy(); + instance_watcher2->handle_release_leader(); + instance_watcher1->handle_acquire_leader(); + instance_watcher2->handle_update_leader(instance_id2); + + instance_watcher1->notify_sync_complete("sync_id"); +} + +TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnNonLeaderAcquireLeader) { + InSequence seq; + + C_SaferCond on_start_op_called; + Context *on_start_ctx; + expect_throttler_start_op("sync_id", &on_start_op_called, + &on_start_ctx); + C_SaferCond on_start; + instance_watcher2->notify_sync_request("sync_id", &on_start); + ASSERT_EQ(0, on_start_op_called.wait()); + + std::vector throttler_queue = {on_start_ctx}; + expect_throttler_destroy(&throttler_queue); + instance_watcher1->handle_release_leader(); + + EXPECT_CALL(mock_image_sync_throttler, start_op("sync_id", _)) + .WillOnce(WithArg<1>(CompleteContext(0))); + instance_watcher2->handle_acquire_leader(); + instance_watcher1->handle_update_leader(instance_id2); + + ASSERT_EQ(0, on_start.wait()); + + C_SaferCond on_finish; + expect_throttler_finish_op("sync_id", &on_finish); + instance_watcher2->notify_sync_complete("sync_id"); + ASSERT_EQ(0, on_finish.wait()); + + expect_throttler_destroy(); + instance_watcher2->handle_release_leader(); + instance_watcher1->handle_acquire_leader(); +} + } // namespace mirror } // namespace rbd diff --git a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc index bb8935193b39e5..2f458e6ad6411d 100644 --- a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc +++ b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc @@ -50,6 +50,7 @@ struct MockManagedLock { MOCK_CONST_METHOD0(is_shutdown, bool()); MOCK_CONST_METHOD0(is_state_post_acquiring, bool()); + MOCK_CONST_METHOD0(is_state_pre_releasing, bool()); MOCK_CONST_METHOD0(is_state_locked, bool()); }; @@ -147,6 +148,10 @@ struct ManagedLock { return MockManagedLock::get_instance().is_state_post_acquiring(); } + bool is_state_pre_releasing() const { + return MockManagedLock::get_instance().is_state_pre_releasing(); + } + bool is_state_locked() const { return MockManagedLock::get_instance().is_state_locked(); } @@ -264,6 +269,8 @@ struct MockListener : public LeaderWatcher::Listener { MOCK_METHOD1(post_acquire_handler, void(Context *)); MOCK_METHOD1(pre_release_handler, void(Context *)); + + MOCK_METHOD1(update_leader_handler, void(const std::string &)); }; MockListener *MockListener::s_instance = nullptr; @@ -371,6 +378,8 @@ class TestMockLeaderWatcher : public TestMockFixture { .Times(AtLeast(0)).WillRepeatedly(Return(false)); EXPECT_CALL(mock_managed_lock, is_state_locked()) .Times(AtLeast(0)).WillRepeatedly(Return(false)); + EXPECT_CALL(mock_managed_lock, is_state_pre_releasing()) + .Times(AtLeast(0)).WillRepeatedly(Return(false)); } void expect_notify_heartbeat(MockManagedLock &mock_managed_lock, @@ -627,6 +636,7 @@ TEST_F(TestMockLeaderWatcher, Break) { expect_is_shutdown(mock_managed_lock); expect_is_leader(mock_managed_lock); expect_destroy(mock_managed_lock); + EXPECT_CALL(listener, update_leader_handler(_)); InSequence seq; diff --git a/src/tools/rbd_mirror/BaseRequest.h b/src/tools/rbd_mirror/BaseRequest.h index 91e4fbaa4e0d0b..f865f600fd8fa0 100644 --- a/src/tools/rbd_mirror/BaseRequest.h +++ b/src/tools/rbd_mirror/BaseRequest.h @@ -20,7 +20,7 @@ class BaseRequest : public RefCountedObject { virtual void cancel() {} protected: - void finish(int r) { + virtual void finish(int r) { if (m_cct) { lsubdout(m_cct, rbd_mirror, 20) << m_name << "::finish: r=" << r << dendl; } diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 3276dba3b8f638..da01a7e5ef3ba2 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -21,7 +21,6 @@ #include "librbd/Utils.h" #include "librbd/journal/Replay.h" #include "ImageReplayer.h" -#include "ImageSync.h" #include "Threads.h" #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h" #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" @@ -267,14 +266,14 @@ void ImageReplayer::RemoteJournalerListener::handle_update( template ImageReplayer::ImageReplayer(Threads *threads, shared_ptr image_deleter, - ImageSyncThrottlerRef image_sync_throttler, + InstanceWatcher *instance_watcher, RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id, const std::string &global_image_id) : m_threads(threads), m_image_deleter(image_deleter), - m_image_sync_throttler(image_sync_throttler), + m_instance_watcher(instance_watcher), m_local(local), m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id), @@ -463,7 +462,7 @@ void ImageReplayer::bootstrap() { ImageReplayer, &ImageReplayer::handle_bootstrap>(this); BootstrapRequest *request = BootstrapRequest::create( - m_local_ioctx, m_remote_image.io_ctx, m_image_sync_throttler, + m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher, &m_local_image_ctx, m_local_image_id, m_remote_image.image_id, m_global_image_id, m_threads->work_queue, m_threads->timer, &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid, diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 72f03f77b6cfb3..e74c7c46ddc1b8 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -47,6 +47,7 @@ namespace journal { template class Replay; } namespace rbd { namespace mirror { +template struct InstanceWatcher; template struct Threads; namespace image_replayer { template class BootstrapRequest; } @@ -73,10 +74,10 @@ class ImageReplayer { static ImageReplayer *create( Threads *threads, std::shared_ptr image_deleter, - ImageSyncThrottlerRef image_sync_throttler, + InstanceWatcher *instance_watcher, RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id, const std::string &global_image_id) { - return new ImageReplayer(threads, image_deleter, image_sync_throttler, + return new ImageReplayer(threads, image_deleter, instance_watcher, local, local_mirror_uuid, local_pool_id, global_image_id); } @@ -86,7 +87,7 @@ class ImageReplayer { ImageReplayer(Threads *threads, std::shared_ptr image_deleter, - ImageSyncThrottlerRef image_sync_throttler, + InstanceWatcher *instance_watcher, RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id, const std::string &global_image_id); virtual ~ImageReplayer(); @@ -283,7 +284,7 @@ class ImageReplayer { Threads *m_threads; std::shared_ptr m_image_deleter; - ImageSyncThrottlerRef m_image_sync_throttler; + InstanceWatcher *m_instance_watcher; RemoteImages m_remote_images; RemoteImage m_remote_image; diff --git a/src/tools/rbd_mirror/ImageSync.cc b/src/tools/rbd_mirror/ImageSync.cc index 9c898fdbb628cc..5edc3bccd72b58 100644 --- a/src/tools/rbd_mirror/ImageSync.cc +++ b/src/tools/rbd_mirror/ImageSync.cc @@ -2,6 +2,7 @@ // vim: ts=8 sw=2 smarttab #include "ImageSync.h" +#include "InstanceWatcher.h" #include "ProgressContext.h" #include "common/errno.h" #include "journal/Journaler.h" @@ -32,13 +33,15 @@ ImageSync::ImageSync(I *local_image_ctx, I *remote_image_ctx, SafeTimer *timer, Mutex *timer_lock, const std::string &mirror_uuid, Journaler *journaler, MirrorPeerClientMeta *client_meta, - ContextWQ *work_queue, Context *on_finish, - ProgressContext *progress_ctx) + ContextWQ *work_queue, + InstanceWatcher *instance_watcher, + Context *on_finish, ProgressContext *progress_ctx) : BaseRequest("rbd::mirror::ImageSync", local_image_ctx->cct, on_finish), m_local_image_ctx(local_image_ctx), m_remote_image_ctx(remote_image_ctx), m_timer(timer), m_timer_lock(timer_lock), m_mirror_uuid(mirror_uuid), m_journaler(journaler), m_client_meta(client_meta), - m_work_queue(work_queue), m_progress_ctx(progress_ctx), + m_work_queue(work_queue), m_instance_watcher(instance_watcher), + m_progress_ctx(progress_ctx), m_lock(unique_lock_name("ImageSync::m_lock", this)) { } @@ -50,7 +53,7 @@ ImageSync::~ImageSync() { template void ImageSync::send() { - send_prune_catch_up_sync_point(); + send_notify_sync_request(); } template @@ -61,6 +64,10 @@ void ImageSync::cancel() { m_canceled = true; + if (m_instance_watcher->cancel_sync_request(m_local_image_ctx->id)) { + return; + } + if (m_snapshot_copy_request != nullptr) { m_snapshot_copy_request->cancel(); } @@ -70,6 +77,29 @@ void ImageSync::cancel() { } } +template +void ImageSync::send_notify_sync_request() { + update_progress("NOTIFY_SYNC_REQUEST"); + + dout(20) << dendl; + + Context *ctx = create_context_callback< + ImageSync, &ImageSync::handle_notify_sync_request>(this); + m_instance_watcher->notify_sync_request(m_local_image_ctx->id, ctx); +} + +template +void ImageSync::handle_notify_sync_request(int r) { + dout(20) << ": r=" << r << dendl; + + if (r < 0) { + BaseRequest::finish(r); + return; + } + + send_prune_catch_up_sync_point(); +} + template void ImageSync::send_prune_catch_up_sync_point() { update_progress("PRUNE_CATCH_UP_SYNC_POINT"); @@ -348,6 +378,14 @@ void ImageSync::update_progress(const std::string &description) { } } +template +void ImageSync::finish(int r) { + dout(20) << ": r=" << r << dendl; + + m_instance_watcher->notify_sync_complete(m_local_image_ctx->id); + BaseRequest::finish(r); +} + } // namespace mirror } // namespace rbd diff --git a/src/tools/rbd_mirror/ImageSync.h b/src/tools/rbd_mirror/ImageSync.h index 42bbcc51d4fbcb..ebb156ba32f96f 100644 --- a/src/tools/rbd_mirror/ImageSync.h +++ b/src/tools/rbd_mirror/ImageSync.h @@ -24,6 +24,8 @@ namespace mirror { class ProgressContext; +template class InstanceWatcher; + namespace image_sync { template class ImageCopyRequest; } namespace image_sync { template class SnapshotCopyRequest; } @@ -39,23 +41,28 @@ class ImageSync : public BaseRequest { Mutex *timer_lock, const std::string &mirror_uuid, Journaler *journaler, MirrorPeerClientMeta *client_meta, - ContextWQ *work_queue, Context *on_finish, - ProgressContext *progress_ctx = nullptr) { + ContextWQ *work_queue, + InstanceWatcher *instance_watcher, + Context *on_finish, + ProgressContext *progress_ctx = nullptr) { return new ImageSync(local_image_ctx, remote_image_ctx, timer, timer_lock, mirror_uuid, journaler, client_meta, work_queue, - on_finish, progress_ctx); + instance_watcher, on_finish, progress_ctx); } ImageSync(ImageCtxT *local_image_ctx, ImageCtxT *remote_image_ctx, SafeTimer *timer, Mutex *timer_lock, const std::string &mirror_uuid, Journaler *journaler, MirrorPeerClientMeta *client_meta, - ContextWQ *work_queue, Context *on_finish, - ProgressContext *progress_ctx = nullptr); + ContextWQ *work_queue, InstanceWatcher *instance_watcher, + Context *on_finish, ProgressContext *progress_ctx = nullptr); ~ImageSync() override; void send() override; void cancel() override; +protected: + void finish(int r) override; + private: /** * @verbatim @@ -63,6 +70,9 @@ class ImageSync : public BaseRequest { * * | * v + * NOTIFY_SYNC_REQUEST + * | + * v * PRUNE_CATCH_UP_SYNC_POINT * | * v @@ -100,6 +110,7 @@ class ImageSync : public BaseRequest { Journaler *m_journaler; MirrorPeerClientMeta *m_client_meta; ContextWQ *m_work_queue; + InstanceWatcher *m_instance_watcher; ProgressContext *m_progress_ctx; SnapMap m_snap_map; @@ -111,6 +122,9 @@ class ImageSync : public BaseRequest { image_sync::ImageCopyRequest *m_image_copy_request = nullptr; decltype(ImageCtxT::object_map) m_object_map = nullptr; + void send_notify_sync_request(); + void handle_notify_sync_request(int r); + void send_prune_catch_up_sync_point(); void handle_prune_catch_up_sync_point(int r); diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.cc b/src/tools/rbd_mirror/ImageSyncThrottler.cc index 48e54a5ad14306..e5d08cea5f48e8 100644 --- a/src/tools/rbd_mirror/ImageSyncThrottler.cc +++ b/src/tools/rbd_mirror/ImageSyncThrottler.cc @@ -13,8 +13,9 @@ */ #include "ImageSyncThrottler.h" -#include "ImageSync.h" -#include "common/ceph_context.h" +#include "common/Formatter.h" +#include "common/debug.h" +#include "common/errno.h" #include "librbd/Utils.h" #define dout_context g_ceph_context @@ -22,225 +23,172 @@ #undef dout_prefix #define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \ << " " << __func__ << ": " -using std::unique_ptr; -using std::string; -using std::set; namespace rbd { namespace mirror { -template -struct ImageSyncThrottler::C_SyncHolder : public Context { - ImageSyncThrottler *m_sync_throttler; - std::string m_local_image_id; - ImageSync *m_sync = nullptr; - Context *m_on_finish; - - C_SyncHolder(ImageSyncThrottler *sync_throttler, - const std::string &local_image_id, Context *on_finish) - : m_sync_throttler(sync_throttler), - m_local_image_id(local_image_id), m_on_finish(on_finish) { - } - - void finish(int r) override { - m_sync_throttler->handle_sync_finished(this); - m_on_finish->complete(r); - } -}; - template ImageSyncThrottler::ImageSyncThrottler() - : m_max_concurrent_syncs(g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs), - m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler", this)) -{ - dout(20) << "Initialized max_concurrent_syncs=" << m_max_concurrent_syncs - << dendl; + : m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler", + this)), + m_max_concurrent_syncs( + g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs) { + dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl; g_ceph_context->_conf->add_observer(this); } template ImageSyncThrottler::~ImageSyncThrottler() { - { - Mutex::Locker l(m_lock); - assert(m_sync_queue.empty()); - assert(m_inflight_syncs.empty()); - } - g_ceph_context->_conf->remove_observer(this); + + Mutex::Locker locker(m_lock); + assert(m_inflight_ops.empty()); + assert(m_queue.empty()); } template -void ImageSyncThrottler::start_sync(I *local_image_ctx, I *remote_image_ctx, - SafeTimer *timer, Mutex *timer_lock, - const std::string &mirror_uuid, - Journaler *journaler, - MirrorPeerClientMeta *client_meta, - ContextWQ *work_queue, - Context *on_finish, - ProgressContext *progress_ctx) { - dout(20) << dendl; +void ImageSyncThrottler::start_op(const std::string &id, Context *on_start) { + dout(20) << "id=" << id << dendl; - C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, local_image_ctx->id, - on_finish); - sync_holder_ctx->m_sync = ImageSync::create(local_image_ctx, - remote_image_ctx, timer, - timer_lock, mirror_uuid, - journaler, client_meta, - work_queue, sync_holder_ctx, - progress_ctx); - sync_holder_ctx->m_sync->get(); - - bool start = false; { - Mutex::Locker l(m_lock); - - if (m_inflight_syncs.size() < m_max_concurrent_syncs) { - assert(m_inflight_syncs.count(local_image_ctx->id) == 0); - m_inflight_syncs[local_image_ctx->id] = sync_holder_ctx; - start = true; - dout(10) << "ready to start image sync for local_image_id " - << local_image_ctx->id << " [" << m_inflight_syncs.size() << "/" - << m_max_concurrent_syncs << "]" << dendl; + Mutex::Locker locker(m_lock); + + if (m_inflight_ops.count(id) > 0) { + dout(20) << "duplicate for already started op " << id << dendl; + } else if (m_max_concurrent_syncs == 0 || + m_inflight_ops.size() < m_max_concurrent_syncs) { + assert(m_queue.empty()); + m_inflight_ops.insert(id); + dout(20) << "ready to start sync for " << id << " [" + << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" + << dendl; } else { - m_sync_queue.push_front(sync_holder_ctx); - dout(10) << "image sync for local_image_id " << local_image_ctx->id - << " has been queued" << dendl; + m_queue.push_back(std::make_pair(id, on_start)); + on_start = nullptr; + dout(20) << "image sync for " << id << " has been queued" << dendl; } } - if (start) { - sync_holder_ctx->m_sync->send(); + if (on_start != nullptr) { + on_start->complete(0); } } template -void ImageSyncThrottler::cancel_sync(const std::string &local_image_id) { - dout(20) << dendl; - - C_SyncHolder *sync_holder = nullptr; - bool running_sync = true; +bool ImageSyncThrottler::cancel_op(const std::string &id) { + dout(20) << "id=" << id << dendl; + Context *on_start = nullptr; { - Mutex::Locker l(m_lock); - if (m_inflight_syncs.empty()) { - // no image sync currently running and neither waiting - return; - } - - auto it = m_inflight_syncs.find(local_image_id); - if (it != m_inflight_syncs.end()) { - sync_holder = it->second; - } - - if (!sync_holder) { - for (auto it = m_sync_queue.begin(); it != m_sync_queue.end(); ++it) { - if ((*it)->m_local_image_id == local_image_id) { - sync_holder = (*it); - m_sync_queue.erase(it); - running_sync = false; - break; - } + Mutex::Locker locker(m_lock); + for (auto it = m_queue.begin(); it != m_queue.end(); ++it) { + if (it->first == id) { + on_start = it->second; + dout(20) << "canceled queued sync for " << id << dendl; + m_queue.erase(it); + break; } } } - if (sync_holder) { - if (running_sync) { - dout(10) << "canceled running image sync for local_image_id " - << sync_holder->m_local_image_id << dendl; - sync_holder->m_sync->cancel(); - } else { - dout(10) << "canceled waiting image sync for local_image_id " - << sync_holder->m_local_image_id << dendl; - sync_holder->m_on_finish->complete(-ECANCELED); - sync_holder->m_sync->put(); - delete sync_holder; - } + if (on_start == nullptr) { + return false; } + + on_start->complete(-ECANCELED); + return true; } template -void ImageSyncThrottler::handle_sync_finished(C_SyncHolder *sync_holder) { - dout(20) << dendl; +void ImageSyncThrottler::finish_op(const std::string &id) { + dout(20) << "id=" << id << dendl; - C_SyncHolder *next_sync_holder = nullptr; + if (cancel_op(id)) { + return; + } + Context *on_start = nullptr; { - Mutex::Locker l(m_lock); - m_inflight_syncs.erase(sync_holder->m_local_image_id); - - if (m_inflight_syncs.size() < m_max_concurrent_syncs && - !m_sync_queue.empty()) { - next_sync_holder = m_sync_queue.back(); - m_sync_queue.pop_back(); - - assert( - m_inflight_syncs.count(next_sync_holder->m_local_image_id) == 0); - m_inflight_syncs[next_sync_holder->m_local_image_id] = - next_sync_holder; - dout(10) << "ready to start image sync for local_image_id " - << next_sync_holder->m_local_image_id << " [" - << m_inflight_syncs.size() << "/" << m_max_concurrent_syncs - << "]" << dendl; + Mutex::Locker locker(m_lock); + + m_inflight_ops.erase(id); + + if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) { + auto pair = m_queue.front(); + m_inflight_ops.insert(pair.first); + dout(20) << "ready to start sync for " << pair.first << " [" + << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" + << dendl; + on_start= pair.second; + m_queue.pop_front(); } + } - dout(10) << "currently running image syncs [" << m_inflight_syncs.size() - << "/" << m_max_concurrent_syncs << "]" << dendl; + if (on_start != nullptr) { + on_start->complete(0); } +} - if (next_sync_holder) { - next_sync_holder->m_sync->send(); +template +void ImageSyncThrottler::drain(int r) { + dout(20) << dendl; + + std::list> queue; + { + Mutex::Locker locker(m_lock); + std::swap(m_queue, queue); + m_inflight_ops.clear(); + } + + for (auto &pair : queue) { + pair.second->complete(r); } } template void ImageSyncThrottler::set_max_concurrent_syncs(uint32_t max) { - dout(20) << " max=" << max << dendl; + dout(20) << "max=" << max << dendl; - assert(max > 0); - - std::list next_sync_holders; + std::list ops; { - Mutex::Locker l(m_lock); - this->m_max_concurrent_syncs = max; - - // Start waiting syncs in the case of available free slots - while(m_inflight_syncs.size() < m_max_concurrent_syncs - && !m_sync_queue.empty()) { - C_SyncHolder *next_sync_holder = m_sync_queue.back(); - next_sync_holders.push_back(next_sync_holder); - m_sync_queue.pop_back(); - - assert( - m_inflight_syncs.count(next_sync_holder->m_local_image_id) == 0); - m_inflight_syncs[next_sync_holder->m_local_image_id] = next_sync_holder; - - dout(10) << "ready to start image sync for local_image_id " - << next_sync_holder->m_local_image_id << " [" - << m_inflight_syncs.size() << "/" << m_max_concurrent_syncs - << "]" << dendl; + Mutex::Locker locker(m_lock); + m_max_concurrent_syncs = max; + + // Start waiting ops in the case of available free slots + while ((m_max_concurrent_syncs == 0 || + m_inflight_ops.size() < m_max_concurrent_syncs) && + !m_queue.empty()) { + auto pair = m_queue.front(); + m_inflight_ops.insert(pair.first); + dout(20) << "ready to start sync for " << pair.first << " [" + << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" + << dendl; + ops.push_back(pair.second); + m_queue.pop_front(); } } - for (const auto& sync_holder : next_sync_holders) { - sync_holder->m_sync->send(); + for (const auto& ctx : ops) { + ctx->complete(0); } } template -void ImageSyncThrottler::print_status(Formatter *f, stringstream *ss) { - Mutex::Locker l(m_lock); +void ImageSyncThrottler::print_status(Formatter *f, std::stringstream *ss) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); if (f) { f->dump_int("max_parallel_syncs", m_max_concurrent_syncs); - f->dump_int("running_syncs", m_inflight_syncs.size()); - f->dump_int("waiting_syncs", m_sync_queue.size()); + f->dump_int("running_syncs", m_inflight_ops.size()); + f->dump_int("waiting_syncs", m_queue.size()); f->flush(*ss); } else { *ss << "[ "; *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", "; - *ss << "running_syncs=" << m_inflight_syncs.size() << ", "; - *ss << "waiting_syncs=" << m_sync_queue.size() << " ]"; + *ss << "running_syncs=" << m_inflight_ops.size() << ", "; + *ss << "waiting_syncs=" << m_queue.size() << " ]"; } } @@ -254,9 +202,8 @@ const char** ImageSyncThrottler::get_tracked_conf_keys() const { } template -void ImageSyncThrottler::handle_conf_change( - const struct md_config_t *conf, - const set &changed) { +void ImageSyncThrottler::handle_conf_change(const struct md_config_t *conf, + const set &changed) { if (changed.count("rbd_mirror_concurrent_image_syncs")) { set_max_concurrent_syncs(conf->rbd_mirror_concurrent_image_syncs); } diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.h b/src/tools/rbd_mirror/ImageSyncThrottler.h index 64d92ece43a6ce..e0c3f0bf5a9b9c 100644 --- a/src/tools/rbd_mirror/ImageSyncThrottler.h +++ b/src/tools/rbd_mirror/ImageSyncThrottler.h @@ -1,88 +1,61 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 SUSE LINUX GmbH - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ -#ifndef CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H -#define CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H +#ifndef RBD_MIRROR_IMAGE_SYNC_THROTTLER_H +#define RBD_MIRROR_IMAGE_SYNC_THROTTLER_H #include -#include +#include +#include +#include #include + #include "common/Mutex.h" -#include "librbd/ImageCtx.h" -#include "include/Context.h" -#include "librbd/journal/TypeTraits.h" +#include "common/config_obs.h" -class CephContext; class Context; -class ContextWQ; -class SafeTimer; -namespace journal { class Journaler; } -namespace librbd { namespace journal { struct MirrorPeerClientMeta; } } + +namespace ceph { class Formatter; } +namespace librbd { class ImageCtx; } namespace rbd { namespace mirror { -template class ImageSync; - -class ProgressContext; - -/** - * Manage concurrent image-syncs - */ template class ImageSyncThrottler : public md_config_obs_t { public: - - typedef librbd::journal::TypeTraits TypeTraits; - typedef typename TypeTraits::Journaler Journaler; - typedef librbd::journal::MirrorPeerClientMeta MirrorPeerClientMeta; + static ImageSyncThrottler *create() { + return new ImageSyncThrottler(); + } + void destroy() { + delete this; + } ImageSyncThrottler(); ~ImageSyncThrottler() override; - ImageSyncThrottler(const ImageSyncThrottler&) = delete; - ImageSyncThrottler& operator=(const ImageSyncThrottler&) = delete; - - void start_sync(ImageCtxT *local_image_ctx, - ImageCtxT *remote_image_ctx, SafeTimer *timer, - Mutex *timer_lock, const std::string &mirror_uuid, - Journaler *journaler, MirrorPeerClientMeta *client_meta, - ContextWQ *work_queue, Context *on_finish, - ProgressContext *progress_ctx = nullptr); - - void cancel_sync(const std::string &local_image_id); void set_max_concurrent_syncs(uint32_t max); + void start_op(const std::string &id, Context *on_start); + bool cancel_op(const std::string &id); + void finish_op(const std::string &id); + void drain(int r); void print_status(Formatter *f, std::stringstream *ss); private: - struct C_SyncHolder; - - void handle_sync_finished(C_SyncHolder *sync_holder); + Mutex m_lock; + uint32_t m_max_concurrent_syncs; + std::list> m_queue; + std::set m_inflight_ops; const char **get_tracked_conf_keys() const override; void handle_conf_change(const struct md_config_t *conf, const std::set &changed) override; - - uint32_t m_max_concurrent_syncs; - Mutex m_lock; - std::list m_sync_queue; - std::map m_inflight_syncs; - }; } // namespace mirror } // namespace rbd -#endif // CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H +extern template class rbd::mirror::ImageSyncThrottler; + +#endif // RBD_MIRROR_IMAGE_SYNC_THROTTLER_H diff --git a/src/tools/rbd_mirror/InstanceReplayer.cc b/src/tools/rbd_mirror/InstanceReplayer.cc index d2426d0d0e4339..2b732617d8ae31 100644 --- a/src/tools/rbd_mirror/InstanceReplayer.cc +++ b/src/tools/rbd_mirror/InstanceReplayer.cc @@ -25,12 +25,12 @@ using librbd::util::create_context_callback; template InstanceReplayer::InstanceReplayer( Threads *threads, std::shared_ptr image_deleter, - ImageSyncThrottlerRef image_sync_throttler, RadosRef local_rados, - const std::string &local_mirror_uuid, int64_t local_pool_id) - : m_threads(threads), m_image_deleter(image_deleter), - m_image_sync_throttler(image_sync_throttler), m_local_rados(local_rados), - m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id), - m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) { + RadosRef local_rados, const std::string &local_mirror_uuid, + int64_t local_pool_id) + : m_threads(threads), m_image_deleter(image_deleter), + m_local_rados(local_rados), m_local_mirror_uuid(local_mirror_uuid), + m_local_pool_id(local_pool_id), + m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) { } template @@ -130,7 +130,8 @@ void InstanceReplayer::release_all(Context *on_finish) { } template -void InstanceReplayer::acquire_image(const std::string &global_image_id, +void InstanceReplayer::acquire_image(InstanceWatcher *instance_watcher, + const std::string &global_image_id, const std::string &peer_mirror_uuid, const std::string &peer_image_id, Context *on_finish) { @@ -145,8 +146,8 @@ void InstanceReplayer::acquire_image(const std::string &global_image_id, if (it == m_image_replayers.end()) { auto image_replayer = ImageReplayer::create( - m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados, - m_local_mirror_uuid, m_local_pool_id, global_image_id); + m_threads, m_image_deleter, instance_watcher, m_local_rados, + m_local_mirror_uuid, m_local_pool_id, global_image_id); dout(20) << global_image_id << ": creating replayer " << image_replayer << dendl; diff --git a/src/tools/rbd_mirror/InstanceReplayer.h b/src/tools/rbd_mirror/InstanceReplayer.h index 252a23bf95c7e0..501218ba066342 100644 --- a/src/tools/rbd_mirror/InstanceReplayer.h +++ b/src/tools/rbd_mirror/InstanceReplayer.h @@ -20,6 +20,7 @@ namespace mirror { class ImageDeleter; template class ImageReplayer; +template class InstanceWatcher; template struct Threads; template @@ -27,10 +28,10 @@ class InstanceReplayer { public: static InstanceReplayer* create( Threads *threads, std::shared_ptr image_deleter, - ImageSyncThrottlerRef image_sync_throttler, RadosRef local_rados, - const std::string &local_mirror_uuid, int64_t local_pool_id) { - return new InstanceReplayer(threads, image_deleter, image_sync_throttler, - local_rados, local_mirror_uuid, local_pool_id); + RadosRef local_rados, const std::string &local_mirror_uuid, + int64_t local_pool_id) { + return new InstanceReplayer(threads, image_deleter, local_rados, + local_mirror_uuid, local_pool_id); } void destroy() { delete this; @@ -38,7 +39,6 @@ class InstanceReplayer { InstanceReplayer(Threads *threads, std::shared_ptr image_deleter, - ImageSyncThrottlerRef image_sync_throttler, RadosRef local_rados, const std::string &local_mirror_uuid, int64_t local_pool_id); ~InstanceReplayer(); @@ -52,7 +52,8 @@ class InstanceReplayer { void add_peer(std::string mirror_uuid, librados::IoCtx io_ctx); void remove_peer(std::string mirror_uuid); - void acquire_image(const std::string &global_image_id, + void acquire_image(InstanceWatcher *instance_watcher, + const std::string &global_image_id, const std::string &peer_mirror_uuid, const std::string &peer_image_id, Context *on_finish); @@ -109,7 +110,6 @@ class InstanceReplayer { Threads *m_threads; std::shared_ptr m_image_deleter; - ImageSyncThrottlerRef m_image_sync_throttler; RadosRef m_local_rados; std::string m_local_mirror_uuid; int64_t m_local_pool_id; diff --git a/src/tools/rbd_mirror/InstanceWatcher.cc b/src/tools/rbd_mirror/InstanceWatcher.cc index c42ee2fc1e07d7..cd369a40230401 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.cc +++ b/src/tools/rbd_mirror/InstanceWatcher.cc @@ -10,6 +10,7 @@ #include "librbd/ManagedLock.h" #include "librbd/Utils.h" #include "InstanceReplayer.h" +#include "ImageSyncThrottler.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd_mirror @@ -82,24 +83,36 @@ struct C_RemoveInstanceRequest : public Context { template struct InstanceWatcher::C_NotifyInstanceRequest : public Context { InstanceWatcher *instance_watcher; - librbd::watcher::Notifier notifier; std::string instance_id; uint64_t request_id; bufferlist bl; Context *on_finish; + bool send_to_leader; + std::unique_ptr notifier; librbd::watcher::NotifyResponse response; - atomic_t canceling; + bool canceling = false; C_NotifyInstanceRequest(InstanceWatcher *instance_watcher, const std::string &instance_id, uint64_t request_id, bufferlist &&bl, Context *on_finish) - : instance_watcher(instance_watcher), - notifier(instance_watcher->m_work_queue, instance_watcher->m_ioctx, - RBD_MIRROR_INSTANCE_PREFIX + instance_id), - instance_id(instance_id), request_id(request_id), bl(bl), - on_finish(on_finish) { - instance_watcher->m_notify_op_tracker.start_op(); + : instance_watcher(instance_watcher), instance_id(instance_id), + request_id(request_id), bl(bl), on_finish(on_finish), + send_to_leader(instance_id.empty()) { + dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ + << ": instance_watcher=" << instance_watcher << ", instance_id=" + << instance_id << ", request_id=" << request_id << dendl; + assert(instance_watcher->m_lock.is_locked()); + + if (!send_to_leader) { + assert((!instance_id.empty())); + notifier.reset(new librbd::watcher::Notifier( + instance_watcher->m_work_queue, + instance_watcher->m_ioctx, + RBD_MIRROR_INSTANCE_PREFIX + instance_id)); + } + + instance_watcher->m_notify_op_tracker.start_op(); auto result = instance_watcher->m_notify_ops.insert( std::make_pair(instance_id, this)).second; assert(result); @@ -108,13 +121,53 @@ struct InstanceWatcher::C_NotifyInstanceRequest : public Context { void send() { dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl; - notifier.notify(bl, &response, this); + assert(instance_watcher->m_lock.is_locked()); + + if (canceling) { + dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ + << ": canceling" << dendl; + instance_watcher->m_work_queue->queue(this, -ECANCELED); + return; + } + + if (send_to_leader) { + if (instance_watcher->m_leader_instance_id.empty()) { + dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ + << ": suspending" << dendl; + instance_watcher->suspend_notify_request(this); + return; + } + + if (instance_watcher->m_leader_instance_id != instance_id) { + auto count = instance_watcher->m_notify_ops.erase( + std::make_pair(instance_id, this)); + assert(count > 0); + + instance_id = instance_watcher->m_leader_instance_id; + + auto result = instance_watcher->m_notify_ops.insert( + std::make_pair(instance_id, this)).second; + assert(result); + + notifier.reset(new librbd::watcher::Notifier( + instance_watcher->m_work_queue, + instance_watcher->m_ioctx, + RBD_MIRROR_INSTANCE_PREFIX + instance_id)); + } + } + + dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ + << ": sendding to " << instance_id << dendl; + notifier->notify(bl, &response, this); } void cancel() { dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl; - canceling.set(1); + assert(instance_watcher->m_lock.is_locked()); + + canceling = true; + instance_watcher->unsuspend_notify_request(this); } void finish(int r) override { @@ -158,27 +211,35 @@ struct InstanceWatcher::C_NotifyInstanceRequest : public Context { if (!found) { if (r == -ETIMEDOUT) { - if (canceling.read()) { - r = -ECANCELED; - } else { - derr << "C_NotifyInstanceRequest: " << this << " " << __func__ - << ": resending after timeout" << dendl; - send(); - return; - } + derr << "C_NotifyInstanceRequest: " << this << " " << __func__ + << ": resending after timeout" << dendl; + Mutex::Locker locker(instance_watcher->m_lock); + send(); + return; } else { r = -EINVAL; } + } else { + if (r == -ESTALE && send_to_leader) { + derr << "C_NotifyInstanceRequest: " << this << " " << __func__ + << ": resending due to leader change" << dendl; + Mutex::Locker locker(instance_watcher->m_lock); + send(); + return; + } } } - instance_watcher->m_notify_op_tracker.finish_op(); on_finish->complete(r); - Mutex::Locker locker(instance_watcher->m_lock); - auto result = instance_watcher->m_notify_ops.erase( + { + Mutex::Locker locker(instance_watcher->m_lock); + auto result = instance_watcher->m_notify_ops.erase( std::make_pair(instance_id, this)); - assert(result > 0); + assert(result > 0); + instance_watcher->m_notify_op_tracker.finish_op(); + } + delete this; } @@ -187,6 +248,40 @@ struct InstanceWatcher::C_NotifyInstanceRequest : public Context { } }; +template +struct InstanceWatcher::C_SyncRequest : public Context { + InstanceWatcher *instance_watcher; + std::string sync_id; + Context *on_start; + Context *on_complete = nullptr; + C_NotifyInstanceRequest *req = nullptr; + + C_SyncRequest(InstanceWatcher *instance_watcher, + const std::string &sync_id, Context *on_start) + : instance_watcher(instance_watcher), sync_id(sync_id), + on_start(on_start) { + dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id=" + << sync_id << dendl; + } + + void finish(int r) override { + dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": r=" + << r << dendl; + + if (on_start != nullptr) { + instance_watcher->handle_notify_sync_request(this, r); + } else { + instance_watcher->handle_notify_sync_complete(this, r); + delete this; + } + } + + // called twice + void complete(int r) override { + finish(r); + } +}; + #undef dout_prefix #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \ << this << " " << __func__ << ": " @@ -237,6 +332,11 @@ InstanceWatcher::InstanceWatcher(librados::IoCtx &io_ctx, template InstanceWatcher::~InstanceWatcher() { + assert(m_notify_ops.empty()); + assert(m_notify_op_tracker.empty()); + assert(m_suspended_ops.empty()); + assert(m_inflight_sync_reqs.empty()); + assert(m_image_sync_throttler == nullptr); m_instance_lock->destroy(); } @@ -348,6 +448,180 @@ void InstanceWatcher::notify_image_release( } } +template +void InstanceWatcher::notify_sync_request(const std::string &sync_id, + Context *on_sync_start) { + dout(20) << "sync_id=" << sync_id << dendl; + + Mutex::Locker locker(m_lock); + + assert(m_inflight_sync_reqs.count(sync_id) == 0); + + uint64_t request_id = ++m_request_seq; + + bufferlist bl; + ::encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl); + + auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start); + sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id, + std::move(bl), sync_ctx); + + m_inflight_sync_reqs[sync_id] = sync_ctx; + sync_ctx->req->send(); +} + +template +bool InstanceWatcher::cancel_sync_request(const std::string &sync_id) { + dout(20) << "sync_id=" << sync_id << dendl; + + Mutex::Locker locker(m_lock); + + auto it = m_inflight_sync_reqs.find(sync_id); + if (it == m_inflight_sync_reqs.end()) { + return false; + } + + auto sync_ctx = it->second; + + if (sync_ctx->on_start == nullptr) { + return false; + } + + assert(sync_ctx->req != nullptr); + sync_ctx->req->cancel(); + return true; +} + +template +void InstanceWatcher::notify_sync_start(const std::string &instance_id, + const std::string &sync_id) { + dout(20) << "sync_id=" << sync_id << dendl; + + Mutex::Locker locker(m_lock); + + uint64_t request_id = ++m_request_seq; + + bufferlist bl; + ::encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl); + + auto ctx = new FunctionContext( + [this, sync_id] (int r) { + dout(20) << "finish: sync_id=" << sync_id << ", r=" << r << dendl; + Mutex::Locker locker(m_lock); + if (r != -ESTALE && m_image_sync_throttler != nullptr) { + m_image_sync_throttler->finish_op(sync_id); + } + }); + auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, + std::move(bl), ctx); + req->send(); +} + +template +void InstanceWatcher::notify_sync_complete(const std::string &sync_id) { + dout(20) << "sync_id=" << sync_id << dendl; + + Mutex::Locker locker(m_lock); + + auto it = m_inflight_sync_reqs.find(sync_id); + assert(it != m_inflight_sync_reqs.end()); + + auto sync_ctx = it->second; + assert(sync_ctx->req == nullptr); + + m_inflight_sync_reqs.erase(it); + m_work_queue->queue(sync_ctx, 0); +} + +template +void InstanceWatcher::handle_notify_sync_request(C_SyncRequest *sync_ctx, + int r) { + dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl; + + Context *on_start = nullptr; + { + Mutex::Locker locker(m_lock); + + assert(sync_ctx->req != nullptr); + assert(sync_ctx->on_start != nullptr); + + if (sync_ctx->req->canceling) { + r = -ECANCELED; + } + + std::swap(sync_ctx->on_start, on_start); + sync_ctx->req = nullptr; + } + + on_start->complete(r == -ECANCELED ? r : 0); + + if (r == -ECANCELED) { + notify_sync_complete(sync_ctx->sync_id); + } +} + +template +void InstanceWatcher::handle_notify_sync_complete(C_SyncRequest *sync_ctx, + int r) { + dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl; + + if (sync_ctx->on_complete != nullptr) { + sync_ctx->on_complete->complete(r); + } +} + +template +void InstanceWatcher::print_sync_status(Formatter *f, stringstream *ss) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + if (m_image_sync_throttler != nullptr) { + m_image_sync_throttler->print_status(f, ss); + } +} + +template +void InstanceWatcher::handle_acquire_leader() { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + assert(m_image_sync_throttler == nullptr); + m_image_sync_throttler = ImageSyncThrottler::create(); + + m_leader_instance_id = m_instance_id; + unsuspend_notify_requests(); +} + +template +void InstanceWatcher::handle_release_leader() { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + assert(m_image_sync_throttler != nullptr); + + m_leader_instance_id.clear(); + + m_image_sync_throttler->drain(-ESTALE); + m_image_sync_throttler->destroy(); + m_image_sync_throttler = nullptr; +} + +template +void InstanceWatcher::handle_update_leader( + const std::string &leader_instance_id) { + dout(20) << "leader_instance_id=" << leader_instance_id << dendl; + + Mutex::Locker locker(m_lock); + + m_leader_instance_id = leader_instance_id; + + if (!m_leader_instance_id.empty()) { + unsuspend_notify_requests(); + } +} + template void InstanceWatcher::cancel_notify_requests( const std::string &instance_id) { @@ -356,13 +630,12 @@ void InstanceWatcher::cancel_notify_requests( Mutex::Locker locker(m_lock); for (auto op : m_notify_ops) { - if (op.first == instance_id) { + if (op.first == instance_id && !op.second->send_to_leader) { op.second->cancel(); } } } - template void InstanceWatcher::register_instance() { assert(m_lock.is_locked()); @@ -715,6 +988,46 @@ void InstanceWatcher::handle_break_instance_lock(int r) { remove_instance_object(); } +template +void InstanceWatcher::suspend_notify_request(C_NotifyInstanceRequest *req) { + dout(20) << req << dendl; + + assert(m_lock.is_locked()); + + auto result = m_suspended_ops.insert(req).second; + assert(result); +} + +template +bool InstanceWatcher::unsuspend_notify_request( + C_NotifyInstanceRequest *req) { + dout(20) << req << dendl; + + assert(m_lock.is_locked()); + + auto result = m_suspended_ops.erase(req); + if (result == 0) { + return false; + } + + req->send(); + return true; +} + +template +void InstanceWatcher::unsuspend_notify_requests() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + std::set suspended_ops; + std::swap(m_suspended_ops, suspended_ops); + + for (auto op : suspended_ops) { + op->send(); + } +} + template Context *InstanceWatcher::prepare_request(const std::string &instance_id, uint64_t request_id, @@ -733,23 +1046,11 @@ Context *InstanceWatcher::prepare_request(const std::string &instance_id, delete it->on_notify_ack; m_requests.erase(it); } else { - ctx = new FunctionContext( - [this, instance_id, request_id] (int r) { - C_NotifyAck *on_notify_ack = nullptr; - { - // update request state in the requests list - Mutex::Locker locker(m_lock); - Request request(instance_id, request_id); - auto it = m_requests.find(request); - assert(it != m_requests.end()); - on_notify_ack = it->on_notify_ack; - m_requests.erase(it); - } - - ::encode(NotifyAckPayload(instance_id, request_id, r), - on_notify_ack->out); - on_notify_ack->complete(0); - }); + ctx = create_async_context_callback( + m_work_queue, new FunctionContext( + [this, instance_id, request_id] (int r) { + complete_request(instance_id, request_id, r); + })); } request.on_notify_ack = on_notify_ack; @@ -757,6 +1058,26 @@ Context *InstanceWatcher::prepare_request(const std::string &instance_id, return ctx; } +template +void InstanceWatcher::complete_request(const std::string &instance_id, + uint64_t request_id, int r) { + dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id + << dendl; + + C_NotifyAck *on_notify_ack; + { + Mutex::Locker locker(m_lock); + Request request(instance_id, request_id); + auto it = m_requests.find(request); + assert(it != m_requests.end()); + on_notify_ack = it->on_notify_ack; + m_requests.erase(it); + } + + ::encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out); + on_notify_ack->complete(0); +} + template void InstanceWatcher::handle_notify(uint64_t notify_id, uint64_t handle, uint64_t notifier_id, bufferlist &bl) { @@ -788,8 +1109,9 @@ void InstanceWatcher::handle_image_acquire( auto ctx = new FunctionContext( [this, global_image_id, peer_mirror_uuid, peer_image_id, on_finish] (int r) { - m_instance_replayer->acquire_image(global_image_id, peer_mirror_uuid, - peer_image_id, on_finish); + m_instance_replayer->acquire_image(this, global_image_id, + peer_mirror_uuid, peer_image_id, + on_finish); m_notify_op_tracker.finish_op(); }); @@ -816,6 +1138,58 @@ void InstanceWatcher::handle_image_release( m_work_queue->queue(ctx, 0); } +template +void InstanceWatcher::handle_sync_request(const std::string &instance_id, + const std::string &sync_id, + Context *on_finish) { + dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl; + + Mutex::Locker locker(m_lock); + + if (m_image_sync_throttler == nullptr) { + dout(20) << "sync request for non-leader" << dendl; + m_work_queue->queue(on_finish, -ESTALE); + return; + } + + Context *on_start = create_async_context_callback( + m_work_queue, new FunctionContext( + [this, instance_id, sync_id, on_finish] (int r) { + dout(20) << "handle_sync_request: finish: instance_id=" << instance_id + << ", sync_id=" << sync_id << ", r=" << r << dendl; + if (r == 0) { + notify_sync_start(instance_id, sync_id); + } + on_finish->complete(r); + })); + m_image_sync_throttler->start_op(sync_id, on_start); +} + +template +void InstanceWatcher::handle_sync_start(const std::string &instance_id, + const std::string &sync_id, + Context *on_finish) { + dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl; + + Mutex::Locker locker(m_lock); + + auto it = m_inflight_sync_reqs.find(sync_id); + if (it == m_inflight_sync_reqs.end()) { + dout(20) << "not found" << dendl; + m_work_queue->queue(on_finish, 0); + return; + } + + auto sync_ctx = it->second; + + if (sync_ctx->on_complete != nullptr) { + dout(20) << "duplicate request" << dendl; + m_work_queue->queue(sync_ctx->on_complete, -ESTALE); + } + + sync_ctx->on_complete = on_finish; +} + template void InstanceWatcher::handle_payload(const std::string &instance_id, const ImageAcquirePayload &payload, @@ -847,6 +1221,38 @@ void InstanceWatcher::handle_payload(const std::string &instance_id, } } +template +void InstanceWatcher::handle_payload(const std::string &instance_id, + const SyncRequestPayload &payload, + C_NotifyAck *on_notify_ack) { + dout(20) << "sync_request: instance_id=" << instance_id << ", " + << "request_id=" << payload.request_id << dendl; + + auto on_finish = prepare_request(instance_id, payload.request_id, + on_notify_ack); + if (on_finish == nullptr) { + return; + } + + handle_sync_request(instance_id, payload.sync_id, on_finish); +} + +template +void InstanceWatcher::handle_payload(const std::string &instance_id, + const SyncStartPayload &payload, + C_NotifyAck *on_notify_ack) { + dout(20) << "sync_start: instance_id=" << instance_id << ", " + << "request_id=" << payload.request_id << dendl; + + auto on_finish = prepare_request(instance_id, payload.request_id, + on_notify_ack); + if (on_finish == nullptr) { + return; + } + + handle_sync_start(instance_id, payload.sync_id, on_finish); +} + template void InstanceWatcher::handle_payload(const std::string &instance_id, const UnknownPayload &payload, diff --git a/src/tools/rbd_mirror/InstanceWatcher.h b/src/tools/rbd_mirror/InstanceWatcher.h index a9d287f1306295..79c2d1c0714a29 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.h +++ b/src/tools/rbd_mirror/InstanceWatcher.h @@ -5,6 +5,7 @@ #define CEPH_RBD_MIRROR_INSTANCE_WATCHER_H #include +#include #include #include #include @@ -24,6 +25,7 @@ template class ManagedLock; namespace rbd { namespace mirror { +template class ImageSyncThrottler; template class InstanceReplayer; template struct Threads; @@ -72,8 +74,18 @@ class InstanceWatcher : protected librbd::Watcher { const std::string &peer_image_id, bool schedule_delete, Context *on_notify_ack); + void notify_sync_request(const std::string &sync_id, Context *on_sync_start); + bool cancel_sync_request(const std::string &sync_id); + void notify_sync_complete(const std::string &sync_id); + + void print_sync_status(Formatter *f, stringstream *ss); + void cancel_notify_requests(const std::string &instance_id); + void handle_acquire_leader(); + void handle_release_leader(); + void handle_update_leader(const std::string &leader_instance_id); + private: /** * @verbatim @@ -105,6 +117,9 @@ class InstanceWatcher : protected librbd::Watcher { */ struct C_NotifyInstanceRequest; + struct C_SyncRequest; + + typedef std::pair Id; struct HandlePayloadVisitor : public boost::static_visitor { InstanceWatcher *instance_watcher; @@ -148,11 +163,15 @@ class InstanceWatcher : protected librbd::Watcher { Context *m_on_finish = nullptr; int m_ret_val = 0; bool m_removing = false; + std::string m_leader_instance_id; librbd::managed_lock::Locker m_instance_locker; std::set> m_notify_ops; AsyncOpTracker m_notify_op_tracker; uint64_t m_request_seq = 0; std::set m_requests; + std::set m_suspended_ops; + std::map m_inflight_sync_reqs; + ImageSyncThrottler *m_image_sync_throttler = nullptr; void register_instance(); void handle_register_instance(int r); @@ -187,8 +206,20 @@ class InstanceWatcher : protected librbd::Watcher { void break_instance_lock(); void handle_break_instance_lock(int r); + void suspend_notify_request(C_NotifyInstanceRequest *req); + bool unsuspend_notify_request(C_NotifyInstanceRequest *req); + void unsuspend_notify_requests(); + + void handle_notify_sync_request(C_SyncRequest *sync_ctx, int r); + void handle_notify_sync_complete(C_SyncRequest *sync_ctx, int r); + + void notify_sync_start(const std::string &instance_id, + const std::string &sync_id); + Context *prepare_request(const std::string &instance_id, uint64_t request_id, C_NotifyAck *on_notify_ack); + void complete_request(const std::string &instance_id, uint64_t request_id, + int r); void handle_notify(uint64_t notify_id, uint64_t handle, uint64_t notifier_id, bufferlist &bl) override; @@ -202,12 +233,23 @@ class InstanceWatcher : protected librbd::Watcher { const std::string &peer_image_id, bool schedule_delete, Context *on_finish); + void handle_sync_request(const std::string &instance_id, + const std::string &sync_id, Context *on_finish); + void handle_sync_start(const std::string &instance_id, + const std::string &sync_id, Context *on_finish); + void handle_payload(const std::string &instance_id, const instance_watcher::ImageAcquirePayload &payload, C_NotifyAck *on_notify_ack); void handle_payload(const std::string &instance_id, const instance_watcher::ImageReleasePayload &payload, C_NotifyAck *on_notify_ack); + void handle_payload(const std::string &instance_id, + const instance_watcher::SyncRequestPayload &payload, + C_NotifyAck *on_notify_ack); + void handle_payload(const std::string &instance_id, + const instance_watcher::SyncStartPayload &payload, + C_NotifyAck *on_notify_ack); void handle_payload(const std::string &instance_id, const instance_watcher::UnknownPayload &payload, C_NotifyAck *on_notify_ack); diff --git a/src/tools/rbd_mirror/LeaderWatcher.cc b/src/tools/rbd_mirror/LeaderWatcher.cc index 0d4957130d9df1..ca0783679baeed 100644 --- a/src/tools/rbd_mirror/LeaderWatcher.cc +++ b/src/tools/rbd_mirror/LeaderWatcher.cc @@ -45,6 +45,11 @@ LeaderWatcher::~LeaderWatcher() { delete m_leader_lock; } +template +std::string LeaderWatcher::get_instance_id() { + return stringify(m_notifier_id); +} + template int LeaderWatcher::init() { C_SaferCond init_ctx; @@ -552,8 +557,10 @@ void LeaderWatcher::handle_get_locker(int r, return; } + bool notify_listener = false; if (m_locker != locker) { m_locker = locker; + notify_listener = true; if (m_acquire_attempts > 1) { dout(10) << "new lock owner detected -- resetting heartbeat counter" << dendl; @@ -566,10 +573,27 @@ void LeaderWatcher::handle_get_locker(int r, dout(0) << "breaking leader lock after " << m_acquire_attempts << " " << "failed attempts to acquire" << dendl; break_leader_lock(); - } else { - schedule_acquire_leader_lock(1); + return; + } + + schedule_acquire_leader_lock(1); + + if (!notify_listener) { m_timer_op_tracker.finish_op(); + return; } + + auto ctx = new FunctionContext( + [this](int r) { + std::string instance_id; + if (get_leader_instance_id(&instance_id)) { + m_listener->update_leader_handler(instance_id); + } + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + m_timer_op_tracker.finish_op(); + }); + m_work_queue->queue(ctx, 0); } template diff --git a/src/tools/rbd_mirror/LeaderWatcher.h b/src/tools/rbd_mirror/LeaderWatcher.h index b4aac4010fa97f..980e2e6826add2 100644 --- a/src/tools/rbd_mirror/LeaderWatcher.h +++ b/src/tools/rbd_mirror/LeaderWatcher.h @@ -33,6 +33,9 @@ class LeaderWatcher : protected librbd::Watcher { virtual void post_acquire_handler(Context *on_finish) = 0; virtual void pre_release_handler(Context *on_finish) = 0; + + virtual void update_leader_handler( + const std::string &leader_instance_id) = 0; }; LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx, @@ -51,6 +54,8 @@ class LeaderWatcher : protected librbd::Watcher { void release_leader(); void list_instances(std::vector *instance_ids); + std::string get_instance_id(); + private: /** * @verbatim diff --git a/src/tools/rbd_mirror/PoolReplayer.cc b/src/tools/rbd_mirror/PoolReplayer.cc index 58aa6f31b1a355..a5a727ecd20b18 100644 --- a/src/tools/rbd_mirror/PoolReplayer.cc +++ b/src/tools/rbd_mirror/PoolReplayer.cc @@ -303,11 +303,8 @@ int PoolReplayer::init() dout(20) << "connected to " << m_peer << dendl; - m_image_sync_throttler.reset(new ImageSyncThrottler<>()); - m_instance_replayer.reset( - InstanceReplayer<>::create(m_threads, m_image_deleter, - m_image_sync_throttler, m_local_rados, + InstanceReplayer<>::create(m_threads, m_image_deleter, m_local_rados, local_mirror_uuid, m_local_pool_id)); m_instance_replayer->init(); m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx); @@ -323,6 +320,7 @@ int PoolReplayer::init() m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx, &m_leader_listener)); + r = m_leader_watcher->init(); if (r < 0) { derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl; @@ -477,7 +475,7 @@ void PoolReplayer::print_status(Formatter *f, stringstream *ss) admin_socket); f->open_object_section("sync_throttler"); - m_image_sync_throttler->print_status(f, ss); + m_instance_watcher->print_sync_status(f, ss); f->close_section(); m_instance_replayer->print_status(f, ss); @@ -626,11 +624,15 @@ void PoolReplayer::handle_update(const std::string &mirror_uuid, void PoolReplayer::handle_post_acquire_leader(Context *on_finish) { dout(20) << dendl; + + m_instance_watcher->handle_acquire_leader(); init_local_pool_watcher(on_finish); } void PoolReplayer::handle_pre_release_leader(Context *on_finish) { dout(20) << dendl; + + m_instance_watcher->handle_release_leader(); shut_down_pool_watchers(on_finish); } @@ -737,5 +739,11 @@ void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) { m_instance_replayer->release_all(on_finish); } +void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) { + dout(20) << "leader_instance_id=" << leader_instance_id << dendl; + + m_instance_watcher->handle_update_leader(leader_instance_id); +} + } // namespace mirror } // namespace rbd diff --git a/src/tools/rbd_mirror/PoolReplayer.h b/src/tools/rbd_mirror/PoolReplayer.h index 820134e423434a..09ad6835d9c5a8 100644 --- a/src/tools/rbd_mirror/PoolReplayer.h +++ b/src/tools/rbd_mirror/PoolReplayer.h @@ -99,9 +99,10 @@ class PoolReplayer { void wait_for_update_ops(Context *on_finish); void handle_wait_for_update_ops(int r, Context *on_finish); + void handle_update_leader(const std::string &leader_instance_id); + Threads *m_threads; std::shared_ptr m_image_deleter; - ImageSyncThrottlerRef<> m_image_sync_throttler; mutable Mutex m_lock; Cond m_cond; std::atomic m_stopping = { false }; @@ -158,6 +159,11 @@ class PoolReplayer { m_pool_replayer->handle_pre_release_leader(on_finish); } + void update_leader_handler( + const std::string &leader_instance_id) override { + m_pool_replayer->handle_update_leader(leader_instance_id); + } + private: PoolReplayer *m_pool_replayer; } m_leader_listener; diff --git a/src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc b/src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc index a5a609708ece57..4dadc744f6c9e6 100644 --- a/src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc +++ b/src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc @@ -21,7 +21,7 @@ #include "librbd/Utils.h" #include "librbd/journal/Types.h" #include "tools/rbd_mirror/ProgressContext.h" -#include "tools/rbd_mirror/ImageSyncThrottler.h" +#include "tools/rbd_mirror/ImageSync.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd_mirror @@ -41,7 +41,7 @@ template BootstrapRequest::BootstrapRequest( librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx, - std::shared_ptr> image_sync_throttler, + InstanceWatcher *instance_watcher, I **local_image_ctx, const std::string &local_image_id, const std::string &remote_image_id, @@ -58,10 +58,10 @@ BootstrapRequest::BootstrapRequest( : BaseRequest("rbd::mirror::image_replayer::BootstrapRequest", reinterpret_cast(local_io_ctx.cct()), on_finish), m_local_io_ctx(local_io_ctx), m_remote_io_ctx(remote_io_ctx), - m_image_sync_throttler(image_sync_throttler), - m_local_image_ctx(local_image_ctx), m_local_image_id(local_image_id), - m_remote_image_id(remote_image_id), m_global_image_id(global_image_id), - m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock), + m_instance_watcher(instance_watcher), m_local_image_ctx(local_image_ctx), + m_local_image_id(local_image_id), m_remote_image_id(remote_image_id), + m_global_image_id(global_image_id), m_work_queue(work_queue), + m_timer(timer), m_timer_lock(timer_lock), m_local_mirror_uuid(local_mirror_uuid), m_remote_mirror_uuid(remote_mirror_uuid), m_journaler(journaler), m_client_meta(client_meta), m_progress_ctx(progress_ctx), @@ -88,7 +88,9 @@ void BootstrapRequest::cancel() { Mutex::Locker locker(m_lock); m_canceled = true; - m_image_sync_throttler->cancel_sync(m_local_image_id); + if (m_image_sync != nullptr) { + m_image_sync->cancel(); + } } template @@ -643,19 +645,22 @@ void BootstrapRequest::image_sync() { { Mutex::Locker locker(m_lock); - if (!m_canceled) { - m_image_sync_throttler->start_sync(*m_local_image_ctx, - m_remote_image_ctx, m_timer, - m_timer_lock, - m_local_mirror_uuid, m_journaler, - m_client_meta, m_work_queue, ctx, - m_progress_ctx); + if (m_canceled) { + m_ret_val = -ECANCELED; + } else { + assert(m_image_sync == nullptr); + m_image_sync = ImageSync::create( + *m_local_image_ctx, m_remote_image_ctx, m_timer, m_timer_lock, + m_local_mirror_uuid, m_journaler, m_client_meta, m_work_queue, + m_instance_watcher, ctx, m_progress_ctx); + + m_image_sync->get(); + m_image_sync->send(); return; } } dout(10) << ": request canceled" << dendl; - m_ret_val = -ECANCELED; close_remote_image(); } @@ -663,14 +668,21 @@ template void BootstrapRequest::handle_image_sync(int r) { dout(20) << ": r=" << r << dendl; - if (m_canceled) { - dout(10) << ": request canceled" << dendl; - m_ret_val = -ECANCELED; - } + { + Mutex::Locker locker(m_lock); - if (r < 0) { - derr << ": failed to sync remote image: " << cpp_strerror(r) << dendl; - m_ret_val = r; + m_image_sync->put(); + m_image_sync = nullptr; + + if (m_canceled) { + dout(10) << ": request canceled" << dendl; + m_ret_val = -ECANCELED; + } + + if (r < 0) { + derr << ": failed to sync remote image: " << cpp_strerror(r) << dendl; + m_ret_val = r; + } } close_remote_image(); diff --git a/src/tools/rbd_mirror/image_replayer/BootstrapRequest.h b/src/tools/rbd_mirror/image_replayer/BootstrapRequest.h index 6e7556894882d5..5696cce61fdd1e 100644 --- a/src/tools/rbd_mirror/image_replayer/BootstrapRequest.h +++ b/src/tools/rbd_mirror/image_replayer/BootstrapRequest.h @@ -27,6 +27,9 @@ namespace mirror { class ProgressContext; +template class ImageSync; +template class InstanceWatcher; + namespace image_replayer { template @@ -40,7 +43,7 @@ class BootstrapRequest : public BaseRequest { static BootstrapRequest* create( librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx, - ImageSyncThrottlerRef image_sync_throttler, + InstanceWatcher *instance_watcher, ImageCtxT **local_image_ctx, const std::string &local_image_id, const std::string &remote_image_id, @@ -55,7 +58,7 @@ class BootstrapRequest : public BaseRequest { bool *do_resync, ProgressContext *progress_ctx = nullptr) { return new BootstrapRequest(local_io_ctx, remote_io_ctx, - image_sync_throttler, local_image_ctx, + instance_watcher, local_image_ctx, local_image_id, remote_image_id, global_image_id, work_queue, timer, timer_lock, local_mirror_uuid, remote_mirror_uuid, @@ -65,7 +68,7 @@ class BootstrapRequest : public BaseRequest { BootstrapRequest(librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx, - ImageSyncThrottlerRef image_sync_throttler, + InstanceWatcher *instance_watcher, ImageCtxT **local_image_ctx, const std::string &local_image_id, const std::string &remote_image_id, @@ -145,7 +148,7 @@ class BootstrapRequest : public BaseRequest { librados::IoCtx &m_local_io_ctx; librados::IoCtx &m_remote_io_ctx; - ImageSyncThrottlerRef m_image_sync_throttler; + InstanceWatcher *m_instance_watcher; ImageCtxT **m_local_image_ctx; std::string m_local_image_id; std::string m_remote_image_id; @@ -159,6 +162,7 @@ class BootstrapRequest : public BaseRequest { MirrorPeerClientMeta *m_client_meta; ProgressContext *m_progress_ctx; bool *m_do_resync; + Mutex m_lock; bool m_canceled = false; @@ -168,6 +172,7 @@ class BootstrapRequest : public BaseRequest { ImageCtxT *m_remote_image_ctx = nullptr; bool m_primary = false; int m_ret_val = 0; + ImageSync *m_image_sync = nullptr; bufferlist m_out_bl; diff --git a/src/tools/rbd_mirror/instance_watcher/Types.cc b/src/tools/rbd_mirror/instance_watcher/Types.cc index a741d1ef913fb8..83ab5d84f8f576 100644 --- a/src/tools/rbd_mirror/instance_watcher/Types.cc +++ b/src/tools/rbd_mirror/instance_watcher/Types.cc @@ -58,22 +58,34 @@ class DumpPayloadVisitor : public boost::static_visitor { } // anonymous namespace -void ImagePayloadBase::encode(bufferlist &bl) const { +void PayloadBase::encode(bufferlist &bl) const { ::encode(request_id, bl); +} + +void PayloadBase::decode(__u8 version, bufferlist::iterator &iter) { + ::decode(request_id, iter); +} + +void PayloadBase::dump(Formatter *f) const { + f->dump_unsigned("request_id", request_id); +} + +void ImagePayloadBase::encode(bufferlist &bl) const { + PayloadBase::encode(bl); ::encode(global_image_id, bl); ::encode(peer_mirror_uuid, bl); ::encode(peer_image_id, bl); } void ImagePayloadBase::decode(__u8 version, bufferlist::iterator &iter) { - ::decode(request_id, iter); + PayloadBase::decode(version, iter); ::decode(global_image_id, iter); ::decode(peer_mirror_uuid, iter); ::decode(peer_image_id, iter); } void ImagePayloadBase::dump(Formatter *f) const { - f->dump_unsigned("request_id", request_id); + PayloadBase::dump(f); f->dump_string("global_image_id", global_image_id); f->dump_string("peer_mirror_uuid", peer_mirror_uuid); f->dump_string("peer_image_id", peer_image_id); @@ -94,6 +106,21 @@ void ImageReleasePayload::dump(Formatter *f) const { f->dump_bool("schedule_delete", schedule_delete); } +void SyncPayloadBase::encode(bufferlist &bl) const { + PayloadBase::encode(bl); + ::encode(sync_id, bl); +} + +void SyncPayloadBase::decode(__u8 version, bufferlist::iterator &iter) { + PayloadBase::decode(version, iter); + ::decode(sync_id, iter); +} + +void SyncPayloadBase::dump(Formatter *f) const { + PayloadBase::dump(f); + f->dump_string("sync_id", sync_id); +} + void UnknownPayload::encode(bufferlist &bl) const { assert(false); } @@ -124,6 +151,12 @@ void NotifyMessage::decode(bufferlist::iterator& iter) { case NOTIFY_OP_IMAGE_RELEASE: payload = ImageReleasePayload(); break; + case NOTIFY_OP_SYNC_REQUEST: + payload = SyncRequestPayload(); + break; + case NOTIFY_OP_SYNC_START: + payload = SyncStartPayload(); + break; default: payload = UnknownPayload(); break; @@ -144,6 +177,12 @@ void NotifyMessage::generate_test_instances(std::list &o) { o.push_back(new NotifyMessage(ImageReleasePayload())); o.push_back(new NotifyMessage(ImageReleasePayload(1, "gid", "uuid", "id", true))); + + o.push_back(new NotifyMessage(SyncRequestPayload())); + o.push_back(new NotifyMessage(SyncRequestPayload(1, "sync_id"))); + + o.push_back(new NotifyMessage(SyncStartPayload())); + o.push_back(new NotifyMessage(SyncStartPayload(1, "sync_id"))); } std::ostream &operator<<(std::ostream &out, const NotifyOp &op) { @@ -154,6 +193,12 @@ std::ostream &operator<<(std::ostream &out, const NotifyOp &op) { case NOTIFY_OP_IMAGE_RELEASE: out << "ImageRelease"; break; + case NOTIFY_OP_SYNC_REQUEST: + out << "SyncRequest"; + break; + case NOTIFY_OP_SYNC_START: + out << "SyncStart"; + break; default: out << "Unknown (" << static_cast(op) << ")"; break; diff --git a/src/tools/rbd_mirror/instance_watcher/Types.h b/src/tools/rbd_mirror/instance_watcher/Types.h index 7299480cf28147..53b333c34afc98 100644 --- a/src/tools/rbd_mirror/instance_watcher/Types.h +++ b/src/tools/rbd_mirror/instance_watcher/Types.h @@ -21,21 +21,36 @@ namespace instance_watcher { enum NotifyOp { NOTIFY_OP_IMAGE_ACQUIRE = 0, NOTIFY_OP_IMAGE_RELEASE = 1, + NOTIFY_OP_SYNC_REQUEST = 2, + NOTIFY_OP_SYNC_START = 3, }; -struct ImagePayloadBase { +struct PayloadBase { uint64_t request_id; + + PayloadBase() : request_id(0) { + } + + PayloadBase(uint64_t request_id) : request_id(request_id) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); + void dump(Formatter *f) const; +}; + +struct ImagePayloadBase : public PayloadBase { std::string global_image_id; std::string peer_mirror_uuid; std::string peer_image_id; - ImagePayloadBase() : request_id(0) { + ImagePayloadBase() : PayloadBase() { } ImagePayloadBase(uint64_t request_id, const std::string &global_image_id, const std::string &peer_mirror_uuid, const std::string &peer_image_id) - : request_id(request_id), global_image_id(global_image_id), + : PayloadBase(request_id), global_image_id(global_image_id), peer_mirror_uuid(peer_mirror_uuid), peer_image_id(peer_image_id) { } @@ -79,6 +94,43 @@ struct ImageReleasePayload : public ImagePayloadBase { void dump(Formatter *f) const; }; +struct SyncPayloadBase : public PayloadBase { + std::string sync_id; + + SyncPayloadBase() : PayloadBase() { + } + + SyncPayloadBase(uint64_t request_id, const std::string &sync_id) + : PayloadBase(request_id), sync_id(sync_id) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); + void dump(Formatter *f) const; +}; + +struct SyncRequestPayload : public SyncPayloadBase { + static const NotifyOp NOTIFY_OP = NOTIFY_OP_SYNC_REQUEST; + + SyncRequestPayload() : SyncPayloadBase() { + } + + SyncRequestPayload(uint64_t request_id, const std::string &sync_id) + : SyncPayloadBase(request_id, sync_id) { + } +}; + +struct SyncStartPayload : public SyncPayloadBase { + static const NotifyOp NOTIFY_OP = NOTIFY_OP_SYNC_START; + + SyncStartPayload() : SyncPayloadBase() { + } + + SyncStartPayload(uint64_t request_id, const std::string &sync_id) + : SyncPayloadBase(request_id, sync_id) { + } +}; + struct UnknownPayload { static const NotifyOp NOTIFY_OP = static_cast(-1); @@ -92,6 +144,8 @@ struct UnknownPayload { typedef boost::variant Payload; struct NotifyMessage { diff --git a/src/tools/rbd_mirror/leader_watcher/Types.h b/src/tools/rbd_mirror/leader_watcher/Types.h index 122a916c76f9b6..b6b3849301a686 100644 --- a/src/tools/rbd_mirror/leader_watcher/Types.h +++ b/src/tools/rbd_mirror/leader_watcher/Types.h @@ -16,9 +16,9 @@ namespace mirror { namespace leader_watcher { enum NotifyOp { - NOTIFY_OP_HEARTBEAT = 0, - NOTIFY_OP_LOCK_ACQUIRED = 1, - NOTIFY_OP_LOCK_RELEASED = 2, + NOTIFY_OP_HEARTBEAT = 0, + NOTIFY_OP_LOCK_ACQUIRED = 1, + NOTIFY_OP_LOCK_RELEASED = 2, }; struct HeartbeatPayload { diff --git a/src/tools/rbd_mirror/pool_watcher/RefreshImagesRequest.cc b/src/tools/rbd_mirror/pool_watcher/RefreshImagesRequest.cc index 1a6ed8c0541a7b..3b216cc33d818b 100644 --- a/src/tools/rbd_mirror/pool_watcher/RefreshImagesRequest.cc +++ b/src/tools/rbd_mirror/pool_watcher/RefreshImagesRequest.cc @@ -2,7 +2,7 @@ // vim: ts=8 sw=2 smarttab #include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h" -#include "common/dout.h" +#include "common/debug.h" #include "common/errno.h" #include "cls/rbd/cls_rbd_client.h" #include "librbd/Utils.h" diff --git a/src/tools/rbd_mirror/types.h b/src/tools/rbd_mirror/types.h index 829998d74e9a08..cec8a4deb63470 100644 --- a/src/tools/rbd_mirror/types.h +++ b/src/tools/rbd_mirror/types.h @@ -11,7 +11,6 @@ #include #include "include/rbd/librbd.hpp" -#include "ImageSyncThrottler.h" namespace rbd { namespace mirror { @@ -20,9 +19,6 @@ typedef shared_ptr RadosRef; typedef shared_ptr IoCtxRef; typedef shared_ptr ImageRef; -template -using ImageSyncThrottlerRef = std::shared_ptr>; - struct ImageId { std::string global_id; std::string id;