Skip to content

Commit

Permalink
rbd-mirror A/A: coordinate image syncs with leader
Browse files Browse the repository at this point in the history
Fixes: http://tracker.ceph.com/issues/18789
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
  • Loading branch information
Mykola Golub committed May 9, 2017
1 parent e74c216 commit 9651873
Show file tree
Hide file tree
Showing 17 changed files with 1,262 additions and 76 deletions.
18 changes: 11 additions & 7 deletions src/test/rbd_mirror/test_ImageReplayer.cc
Expand Up @@ -37,7 +37,7 @@
#include "tools/rbd_mirror/types.h"
#include "tools/rbd_mirror/ImageReplayer.h"
#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/InstanceSyncThrottler.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
#include "tools/rbd_mirror/ImageDeleter.h"

Expand Down Expand Up @@ -119,16 +119,21 @@ class TestImageReplayer : public ::rbd::mirror::TestFixture {
m_image_deleter.reset(new rbd::mirror::ImageDeleter(m_threads->work_queue,
m_threads->timer,
&m_threads->timer_lock));
m_instance_sync_throttler.reset(new rbd::mirror::InstanceSyncThrottler<>());
m_image_sync_throttler.reset(
new rbd::mirror::ImageSyncThrottler<>(m_instance_sync_throttler.get()));
m_instance_watcher = rbd::mirror::InstanceWatcher<>::create(
m_local_ioctx, m_threads->work_queue, nullptr);
m_instance_watcher->handle_acquire_leader();
m_image_sync_throttler.reset(new rbd::mirror::ImageSyncThrottler<>());
m_image_sync_throttler->init(m_instance_watcher);
}

~TestImageReplayer() override
~TestImageReplayer() override
{
unwatch();

m_instance_watcher->handle_release_leader();

delete m_replayer;
delete m_instance_watcher;
delete m_threads;

EXPECT_EQ(0, m_remote_cluster.pool_delete(m_remote_pool_name.c_str()));
Expand Down Expand Up @@ -367,8 +372,7 @@ class TestImageReplayer : public ::rbd::mirror::TestFixture {
std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
std::shared_ptr<librados::Rados> m_local_cluster;
librados::Rados m_remote_cluster;
std::shared_ptr<
rbd::mirror::InstanceSyncThrottler<>> m_instance_sync_throttler;
rbd::mirror::InstanceWatcher<> *m_instance_watcher;
std::shared_ptr<rbd::mirror::ImageSyncThrottler<>> m_image_sync_throttler;
std::string m_local_mirror_uuid = "local mirror uuid";
std::string m_remote_mirror_uuid = "remote mirror uuid";
Expand Down
3 changes: 3 additions & 0 deletions src/test/rbd_mirror/test_LeaderWatcher.cc
Expand Up @@ -71,6 +71,9 @@ class TestLeaderWatcher : public ::rbd::mirror::TestFixture {
}
}

void update_leader_handler(const std::string &leader_instance_id) override {
}

private:
mutable Mutex m_test_lock;
int m_acquire_count = 0;
Expand Down
120 changes: 106 additions & 14 deletions src/test/rbd_mirror/test_mock_ImageSyncThrottler.cc
Expand Up @@ -17,8 +17,9 @@
#include "test/journal/mock/MockJournaler.h"
#include "test/librbd/mock/MockImageCtx.h"
#include "librbd/ImageState.h"
#include "tools/rbd_mirror/Threads.h"
#include "tools/rbd_mirror/ImageSync.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"

namespace librbd {

Expand Down Expand Up @@ -46,6 +47,70 @@ namespace rbd {
namespace mirror {

using ::testing::Invoke;
using ::testing::Return;

template <>
struct Threads<librbd::MockTestImageCtx> {
Mutex &timer_lock;
SafeTimer *timer;
ContextWQ *work_queue;

Threads(Threads<librbd::ImageCtx> *threads)
: timer_lock(threads->timer_lock), timer(threads->timer),
work_queue(threads->work_queue) {
}
};

template<>
struct InstanceWatcher<librbd::MockTestImageCtx> {
size_t max_concurrent_syncs = 0;
std::set<std::string> syncs;
std::list<std::pair<std::string, Context *>> sync_queue;

virtual ~InstanceWatcher() {
EXPECT_TRUE(syncs.empty());
EXPECT_TRUE(sync_queue.empty());
}

void notify_start_sync(const std::string &sync_id, Context *on_start) {
sync_queue.push_back(std::make_pair(sync_id, on_start));
start_syncs();
}

bool notify_cancel_sync(const std::string &sync_id) {
for (auto it = sync_queue.begin(); it != sync_queue.end(); it++) {
if (it->first == sync_id) {
it->second->complete(-ECANCELED);
sync_queue.erase(it);
return true;
}
}

EXPECT_TRUE(syncs.count(sync_id) > 0);
return false;
}

void notify_finish_sync(const std::string &sync_id) {
EXPECT_TRUE(syncs.erase(sync_id) > 0);
start_syncs();
}

void set_max_concurrent_syncs(size_t max_syncs) {
max_concurrent_syncs = max_syncs;
start_syncs();
}

void start_syncs() {
while ((max_concurrent_syncs == 0 || syncs.size() < max_concurrent_syncs) &&
!sync_queue.empty()) {
auto pair = sync_queue.front();
EXPECT_EQ(0U, syncs.count(pair.first));
EXPECT_TRUE(syncs.insert(pair.first).second);
pair.second->complete(0);
sync_queue.pop_front();
}
}
};

typedef ImageSync<librbd::MockTestImageCtx> MockImageSync;

Expand All @@ -55,6 +120,7 @@ class ImageSync<librbd::MockTestImageCtx> {
static std::vector<MockImageSync *> instances;

Context *on_finish;
C_SaferCond on_sync_start;
bool syncing = false;

static ImageSync* create(librbd::MockTestImageCtx *local_image_ctx,
Expand All @@ -71,6 +137,7 @@ class ImageSync<librbd::MockTestImageCtx> {
EXPECT_CALL(*sync, send())
.WillRepeatedly(Invoke([sync]() {
sync->syncing = true;
sync->on_sync_start.complete(0);
}));

return sync;
Expand Down Expand Up @@ -100,7 +167,6 @@ std::vector<MockImageSync *> MockImageSync::instances;


// template definitions
#include "tools/rbd_mirror/InstanceSyncThrottler.cc"
#include "tools/rbd_mirror/ImageSyncThrottler.cc"

namespace rbd {
Expand All @@ -110,20 +176,24 @@ class TestMockImageSyncThrottler : public TestMockFixture {
public:
typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
typedef InstanceSyncThrottler<librbd::MockTestImageCtx> MockInstanceSyncThrottler;
typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
typedef Threads<librbd::MockTestImageCtx> MockThreads;

void SetUp() override {
TestMockFixture::SetUp();

m_mock_threads = new MockThreads(m_threads);

librbd::RBD rbd;
ASSERT_EQ(0, create_image(rbd, m_remote_io_ctx, m_image_name, m_image_size));
ASSERT_EQ(0, open_image(m_remote_io_ctx, m_image_name, &m_remote_image_ctx));

ASSERT_EQ(0, create_image(rbd, m_local_io_ctx, m_image_name, m_image_size));
ASSERT_EQ(0, open_image(m_local_io_ctx, m_image_name, &m_local_image_ctx));

m_mock_instance_sync_throttler = new MockInstanceSyncThrottler();
mock_sync_throttler =
new MockImageSyncThrottler(m_mock_instance_sync_throttler);
mock_sync_throttler = new MockImageSyncThrottler();
m_mock_instance_watcher = new MockInstanceWatcher();
mock_sync_throttler->init(m_mock_instance_watcher);

m_mock_local_image_ctx = new librbd::MockTestImageCtx(*m_local_image_ctx);
m_mock_remote_image_ctx = new librbd::MockTestImageCtx(*m_remote_image_ctx);
Expand All @@ -133,10 +203,11 @@ class TestMockImageSyncThrottler : public TestMockFixture {
void TearDown() override {
MockImageSync::instances.clear();
delete mock_sync_throttler;
delete m_mock_instance_sync_throttler;
delete m_mock_instance_watcher;
delete m_mock_local_image_ctx;
delete m_mock_remote_image_ctx;
delete m_mock_journaler;
delete m_mock_threads;
TestMockFixture::TearDown();
}

Expand Down Expand Up @@ -172,8 +243,9 @@ class TestMockImageSyncThrottler : public TestMockFixture {
librbd::MockTestImageCtx *m_mock_remote_image_ctx;
journal::MockJournaler *m_mock_journaler;
librbd::journal::MirrorPeerClientMeta m_client_meta;
MockInstanceSyncThrottler *m_mock_instance_sync_throttler;
MockInstanceWatcher *m_mock_instance_watcher;
MockImageSyncThrottler *mock_sync_throttler;
MockThreads *m_mock_threads;
};

TEST_F(TestMockImageSyncThrottler, Single_Sync) {
Expand All @@ -182,13 +254,14 @@ TEST_F(TestMockImageSyncThrottler, Single_Sync) {

ASSERT_EQ(1u, MockImageSync::instances.size());
MockImageSync *sync = MockImageSync::instances[0];
ASSERT_EQ(0, sync->on_sync_start.wait());
ASSERT_EQ(true, sync->syncing);
sync->finish(0);
ASSERT_EQ(0, ctx.wait());
}

TEST_F(TestMockImageSyncThrottler, Multiple_Syncs) {
m_mock_instance_sync_throttler->set_max_concurrent_syncs(2);
m_mock_instance_watcher->set_max_concurrent_syncs(2);

C_SaferCond ctx1;
start_sync("image_id_1", &ctx1);
Expand All @@ -202,9 +275,11 @@ TEST_F(TestMockImageSyncThrottler, Multiple_Syncs) {
ASSERT_EQ(4u, MockImageSync::instances.size());

MockImageSync *sync1 = MockImageSync::instances[0];
ASSERT_EQ(0, sync1->on_sync_start.wait());
ASSERT_TRUE(sync1->syncing);

MockImageSync *sync2 = MockImageSync::instances[1];
ASSERT_EQ(0, sync2->on_sync_start.wait());
ASSERT_TRUE(sync2->syncing);

MockImageSync *sync3 = MockImageSync::instances[2];
Expand All @@ -216,10 +291,12 @@ TEST_F(TestMockImageSyncThrottler, Multiple_Syncs) {
sync1->finish(0);
ASSERT_EQ(0, ctx1.wait());

ASSERT_EQ(0, sync3->on_sync_start.wait());
ASSERT_TRUE(sync3->syncing);
sync3->finish(-EINVAL);
ASSERT_EQ(-EINVAL, ctx3.wait());

ASSERT_EQ(0, sync4->on_sync_start.wait());
ASSERT_TRUE(sync4->syncing);

sync2->finish(0);
Expand All @@ -238,9 +315,11 @@ TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync) {
ASSERT_EQ(2u, MockImageSync::instances.size());

MockImageSync *sync1 = MockImageSync::instances[0];
ASSERT_EQ(0, sync1->on_sync_start.wait());
ASSERT_TRUE(sync1->syncing);

MockImageSync *sync2 = MockImageSync::instances[1];
ASSERT_EQ(0, sync2->on_sync_start.wait());
ASSERT_TRUE(sync2->syncing);

cancel("image_id_2", sync2);
Expand All @@ -251,7 +330,7 @@ TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync) {
}

TEST_F(TestMockImageSyncThrottler, Cancel_Waiting_Sync) {
m_mock_instance_sync_throttler->set_max_concurrent_syncs(1);
m_mock_instance_watcher->set_max_concurrent_syncs(1);

C_SaferCond ctx1;
start_sync("image_id_1", &ctx1);
Expand All @@ -261,6 +340,7 @@ TEST_F(TestMockImageSyncThrottler, Cancel_Waiting_Sync) {
ASSERT_EQ(2u, MockImageSync::instances.size());

MockImageSync *sync1 = MockImageSync::instances[0];
ASSERT_EQ(0, sync1->on_sync_start.wait());
ASSERT_TRUE(sync1->syncing);

MockImageSync *sync2 = MockImageSync::instances[1];
Expand All @@ -274,7 +354,7 @@ TEST_F(TestMockImageSyncThrottler, Cancel_Waiting_Sync) {
}

TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) {
m_mock_instance_sync_throttler->set_max_concurrent_syncs(1);
m_mock_instance_watcher->set_max_concurrent_syncs(1);

C_SaferCond ctx1;
start_sync("image_id_1", &ctx1);
Expand All @@ -284,6 +364,7 @@ TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) {
ASSERT_EQ(2u, MockImageSync::instances.size());

MockImageSync *sync1 = MockImageSync::instances[0];
ASSERT_EQ(0, sync1->on_sync_start.wait());
ASSERT_TRUE(sync1->syncing);

MockImageSync *sync2 = MockImageSync::instances[1];
Expand All @@ -292,13 +373,14 @@ TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) {
cancel("image_id_1", sync1);
ASSERT_EQ(-ECANCELED, ctx1.wait());

ASSERT_EQ(0, sync2->on_sync_start.wait());
ASSERT_TRUE(sync2->syncing);
sync2->finish(0);
ASSERT_EQ(0, ctx2.wait());
}

TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) {
m_mock_instance_sync_throttler->set_max_concurrent_syncs(2);
m_mock_instance_watcher->set_max_concurrent_syncs(2);

C_SaferCond ctx1;
start_sync("image_id_1", &ctx1);
Expand All @@ -314,9 +396,11 @@ TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) {
ASSERT_EQ(5u, MockImageSync::instances.size());

MockImageSync *sync1 = MockImageSync::instances[0];
ASSERT_EQ(0, sync1->on_sync_start.wait());
ASSERT_TRUE(sync1->syncing);

MockImageSync *sync2 = MockImageSync::instances[1];
ASSERT_EQ(0, sync2->on_sync_start.wait());
ASSERT_TRUE(sync2->syncing);

MockImageSync *sync3 = MockImageSync::instances[2];
Expand All @@ -328,15 +412,18 @@ TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) {
MockImageSync *sync5 = MockImageSync::instances[4];
ASSERT_FALSE(sync5->syncing);

m_mock_instance_sync_throttler->set_max_concurrent_syncs(4);
m_mock_instance_watcher->set_max_concurrent_syncs(4);

ASSERT_EQ(0, sync3->on_sync_start.wait());
ASSERT_TRUE(sync3->syncing);
ASSERT_EQ(0, sync4->on_sync_start.wait());
ASSERT_TRUE(sync4->syncing);
ASSERT_FALSE(sync5->syncing);

sync1->finish(0);
ASSERT_EQ(0, ctx1.wait());

ASSERT_EQ(0, sync5->on_sync_start.wait());
ASSERT_TRUE(sync5->syncing);
sync5->finish(-EINVAL);
ASSERT_EQ(-EINVAL, ctx5.wait());
Expand All @@ -352,7 +439,7 @@ TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) {
}

TEST_F(TestMockImageSyncThrottler, Decrease_Max_Concurrent_Syncs) {
m_mock_instance_sync_throttler->set_max_concurrent_syncs(4);
m_mock_instance_watcher->set_max_concurrent_syncs(4);

C_SaferCond ctx1;
start_sync("image_id_1", &ctx1);
Expand All @@ -368,21 +455,25 @@ TEST_F(TestMockImageSyncThrottler, Decrease_Max_Concurrent_Syncs) {
ASSERT_EQ(5u, MockImageSync::instances.size());

MockImageSync *sync1 = MockImageSync::instances[0];
ASSERT_EQ(0, sync1->on_sync_start.wait());
ASSERT_TRUE(sync1->syncing);

MockImageSync *sync2 = MockImageSync::instances[1];
ASSERT_EQ(0, sync2->on_sync_start.wait());
ASSERT_TRUE(sync2->syncing);

MockImageSync *sync3 = MockImageSync::instances[2];
ASSERT_EQ(0, sync3->on_sync_start.wait());
ASSERT_TRUE(sync3->syncing);

MockImageSync *sync4 = MockImageSync::instances[3];
ASSERT_EQ(0, sync4->on_sync_start.wait());
ASSERT_TRUE(sync4->syncing);

MockImageSync *sync5 = MockImageSync::instances[4];
ASSERT_FALSE(sync5->syncing);

m_mock_instance_sync_throttler->set_max_concurrent_syncs(2);
m_mock_instance_watcher->set_max_concurrent_syncs(2);

ASSERT_FALSE(sync5->syncing);

Expand All @@ -399,6 +490,7 @@ TEST_F(TestMockImageSyncThrottler, Decrease_Max_Concurrent_Syncs) {
sync3->finish(0);
ASSERT_EQ(0, ctx3.wait());

ASSERT_EQ(0, sync5->on_sync_start.wait());
ASSERT_TRUE(sync5->syncing);

sync4->finish(0);
Expand Down

0 comments on commit 9651873

Please sign in to comment.