Skip to content

Commit

Permalink
rbd-mirror: stop replay when client is disconnected
Browse files Browse the repository at this point in the history
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
  • Loading branch information
Mykola Golub committed Aug 15, 2016
1 parent 2d798f2 commit be3571f
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 15 deletions.
47 changes: 47 additions & 0 deletions qa/workunits/rbd/rbd_mirror.sh
Expand Up @@ -243,4 +243,51 @@ wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position'
compare_images ${POOL} ${image}

testlog "TEST: client disconnect"
image=laggy
create_image ${CLUSTER2} ${POOL} ${image} 128 --journal-object-size 64K
write_image ${CLUSTER2} ${POOL} ${image} 10

testlog " - replay stopped after disconnect"
wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
disconnect_image ${CLUSTER2} ${POOL} ${image}
test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected'

testlog " - replay started after resync requested"
request_resync_image ${CLUSTER1} ${POOL} ${image}
wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted'
wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
compare_images ${POOL} ${image}

testlog " - disconnected after max_concurrent_object_sets reached"
admin_daemon ${CLUSTER1} rbd mirror stop ${POOL}/${image}
wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
set_image_meta ${CLUSTER2} ${POOL} ${image} \
conf_rbd_journal_max_concurrent_object_sets 1
write_image ${CLUSTER2} ${POOL} ${image} 20 16384
write_image ${CLUSTER2} ${POOL} ${image} 20 16384
test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
set_image_meta ${CLUSTER2} ${POOL} ${image} \
conf_rbd_journal_max_concurrent_object_sets 0

testlog " - replay is still stopped (disconnected) after restart"
admin_daemon ${CLUSTER1} rbd mirror start ${POOL}/${image}
wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected'

testlog " - replay started after resync requested"
request_resync_image ${CLUSTER1} ${POOL} ${image}
wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted'
wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
compare_images ${POOL} ${image}

echo OK
38 changes: 34 additions & 4 deletions qa/workunits/rbd/rbd_mirror_helpers.sh
Expand Up @@ -418,7 +418,7 @@ get_position()
local status_log=${TEMPDIR}/${CLUSTER2}-${pool}-${image}.status
rbd --cluster ${cluster} -p ${pool} journal status --image ${image} |
tee ${status_log} >&2
sed -nEe 's/^.*\[id='"${id_regexp}"',.*positions=\[\[([^]]*)\],.*$/\1/p' \
sed -nEe 's/^.*\[id='"${id_regexp}"',.*positions=\[\[([^]]*)\],.*state=connected.*$/\1/p' \
${status_log}
}

Expand Down Expand Up @@ -488,13 +488,30 @@ test_status_in_pool_dir()
}

create_image()
{
local cluster=$1 ; shift
local pool=$1 ; shift
local image=$1 ; shift
local size=128

if [ -n "$1" ]; then
size=$1
shift
fi

rbd --cluster ${cluster} -p ${pool} create --size ${size} \
--image-feature layering,exclusive-lock,journaling $@ ${image}
}

set_image_meta()
{
local cluster=$1
local pool=$2
local image=$3
local key=$4
local val=$5

rbd --cluster ${cluster} -p ${pool} create --size 128 \
--image-feature layering,exclusive-lock,journaling ${image}
rbd --cluster ${cluster} -p ${pool} image-meta set ${image} $key $val
}

remove_image()
Expand Down Expand Up @@ -532,6 +549,16 @@ clone_image()
${clone_pool}/${clone_image} --image-feature layering,exclusive-lock,journaling
}

disconnect_image()
{
local cluster=$1
local pool=$2
local image=$3

rbd --cluster ${cluster} -p ${pool} journal client disconnect \
--image ${image}
}

create_snapshot()
{
local cluster=$1
Expand Down Expand Up @@ -614,9 +641,12 @@ write_image()
local pool=$2
local image=$3
local count=$4
local size=$5

test -n "${size}" || size=4096

rbd --cluster ${cluster} -p ${pool} bench-write ${image} \
--io-size 4096 --io-threads 1 --io-total $((4096 * count)) \
--io-size ${size} --io-threads 1 --io-total $((size * count)) \
--io-pattern rand
}

Expand Down
8 changes: 5 additions & 3 deletions src/test/journal/mock/MockJournaler.h
Expand Up @@ -102,6 +102,7 @@ struct MockJournaler {
Context*));

MOCK_METHOD2(register_client, void(const bufferlist &, Context *));
MOCK_METHOD1(unregister_client, void(Context *));
MOCK_METHOD3(get_client, void(const std::string &, cls::journal::Client *,
Context *));
MOCK_METHOD2(get_cached_client, int(const std::string&, cls::journal::Client*));
Expand Down Expand Up @@ -159,9 +160,6 @@ struct MockJournalerProxy {
int register_client(const bufferlist &data) {
return -EINVAL;
}
void unregister_client(Context *ctx) {
ctx->complete(-EINVAL);
}

void allocate_tag(uint64_t, const bufferlist &,
cls::journal::Tag*, Context *on_finish) {
Expand Down Expand Up @@ -196,6 +194,10 @@ struct MockJournalerProxy {
MockJournaler::get_instance().register_client(data, on_finish);
}

void unregister_client(Context *on_finish) {
MockJournaler::get_instance().unregister_client(on_finish);
}

void get_client(const std::string &client_id, cls::journal::Client *client,
Context *on_finish) {
MockJournaler::get_instance().get_client(client_id, client, on_finish);
Expand Down
68 changes: 68 additions & 0 deletions src/test/rbd_mirror/test_ImageReplayer.cc
Expand Up @@ -233,6 +233,9 @@ class TestImageReplayer : public ::testing::Test {
std::set<cls::journal::Client>::const_iterator c;
for (c = registered_clients.begin(); c != registered_clients.end(); c++) {
std::cout << __func__ << ": client: " << *c << std::endl;
if (c->state != cls::journal::CLIENT_STATE_CONNECTED) {
continue;
}
cls::journal::ObjectPositions object_positions =
c->commit_position.object_positions;
cls::journal::ObjectPositions::const_iterator p =
Expand Down Expand Up @@ -822,3 +825,68 @@ TEST_F(TestImageReplayer, MultipleReplayFailures_MultiEpoch) {
close_image(ictx);
}

TEST_F(TestImageReplayer, Disconnect)
{
bootstrap();

// Test start fails if disconnected

librbd::ImageCtx *ictx;

generate_test_data();
open_remote_image(&ictx);
for (int i = 0; i < TEST_IO_COUNT; ++i) {
write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
}
flush(ictx);
close_image(ictx);

std::string oid = ::journal::Journaler::header_oid(m_remote_image_id);
ASSERT_EQ(0, cls::journal::client::client_update_state(m_remote_ioctx, oid,
m_local_mirror_uuid, cls::journal::CLIENT_STATE_DISCONNECTED));

C_SaferCond cond1;
m_replayer->start(&cond1);
ASSERT_EQ(-ENOTCONN, cond1.wait());

// Test start succeeds after resync

open_local_image(&ictx);
librbd::Journal<>::request_resync(ictx);
close_image(ictx);
C_SaferCond cond2;
m_replayer->start(&cond2);
ASSERT_EQ(-ENOTCONN, cond2.wait());
C_SaferCond delete_cond;
m_image_deleter->wait_for_scheduled_deletion(
m_local_ioctx.get_id(), m_replayer->get_global_image_id(), &delete_cond);
EXPECT_EQ(0, delete_cond.wait());

start();
wait_for_replay_complete();

// Test replay stopped after disconnect

open_remote_image(&ictx);
for (int i = TEST_IO_COUNT; i < 2 * TEST_IO_COUNT; ++i) {
write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
}
flush(ictx);
close_image(ictx);

ASSERT_EQ(0, cls::journal::client::client_update_state(m_remote_ioctx, oid,
m_local_mirror_uuid, cls::journal::CLIENT_STATE_DISCONNECTED));
bufferlist bl;
ASSERT_EQ(0, m_remote_ioctx.notify2(oid, bl, 5000, NULL));

wait_for_stopped();

// Test start fails after disconnect

C_SaferCond cond3;
m_replayer->start(&cond3);
ASSERT_EQ(-ENOTCONN, cond3.wait());
C_SaferCond cond4;
m_replayer->start(&cond4);
ASSERT_EQ(-ENOTCONN, cond4.wait());
}
60 changes: 54 additions & 6 deletions src/tools/rbd_mirror/ImageReplayer.cc
Expand Up @@ -251,6 +251,15 @@ void ImageReplayer<I>::BootstrapProgressContext::update_progress(
}
}

template <typename I>
void ImageReplayer<I>::RemoteJournalerListener::handle_update(
::journal::JournalMetadata *) {
FunctionContext *ctx = new FunctionContext([this](int r) {
replayer->handle_remote_journal_metadata_updated();
});
replayer->m_threads->work_queue->queue(ctx, 0);
}

template <typename I>
ImageReplayer<I>::ImageReplayer(Threads *threads,
shared_ptr<ImageDeleter> image_deleter,
Expand All @@ -277,7 +286,8 @@ ImageReplayer<I>::ImageReplayer(Threads *threads,
m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " +
remote_image_id),
m_progress_cxt(this),
m_resync_listener(new ResyncListener<I>(this))
m_resync_listener(new ResyncListener<I>(this)),
m_remote_listener(this)
{
// Register asok commands using a temporary "remote_pool_name/global_image_id"
// name. When the image name becomes known on start the asok commands will be
Expand Down Expand Up @@ -509,6 +519,23 @@ void ImageReplayer<I>::handle_init_remote_journaler(int r) {
return;
}

m_remote_journaler->add_listener(&m_remote_listener);

cls::journal::Client client;
r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
if (r < 0) {
derr << "error retrieving remote journal client: " << cpp_strerror(r)
<< dendl;
on_start_fail(r, "error retrieving remote journal client");
return;
}

if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
dout(5) << "client flagged disconnected, stopping image replay" << dendl;
on_start_fail(-ENOTCONN, "disconnected");
return;
}

start_replay();
}

Expand Down Expand Up @@ -637,15 +664,18 @@ bool ImageReplayer<I>::on_start_interrupted()
}

template <typename I>
void ImageReplayer<I>::stop(Context *on_finish, bool manual)
void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
const std::string& desc)
{
dout(20) << "on_finish=" << on_finish << dendl;
dout(20) << "on_finish=" << on_finish << ", manual=" << manual
<< ", desc=" << desc << dendl;

image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
bool shut_down_replay = false;
bool running = true;
{
Mutex::Locker locker(m_lock);

if (!is_running_()) {
running = false;
} else {
Expand Down Expand Up @@ -684,14 +714,14 @@ void ImageReplayer<I>::stop(Context *on_finish, bool manual)
}

if (shut_down_replay) {
on_stop_journal_replay();
on_stop_journal_replay(r, desc);
} else if (on_finish != nullptr) {
on_finish->complete(0);
}
}

template <typename I>
void ImageReplayer<I>::on_stop_journal_replay()
void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
{
dout(20) << "enter" << dendl;

Expand All @@ -705,7 +735,7 @@ void ImageReplayer<I>::on_stop_journal_replay()
m_state = STATE_STOPPING;
}

set_state_description(0, "");
set_state_description(r, desc);
update_mirror_image_status(false, boost::none);
reschedule_update_status_task(-1);
shut_down(0);
Expand Down Expand Up @@ -1351,6 +1381,7 @@ void ImageReplayer<I>::shut_down(int r) {
ctx->complete(0);
});
ctx = new FunctionContext([this, ctx](int r) {
m_remote_journaler->remove_listener(&m_remote_listener);
m_remote_journaler->shut_down(ctx);
});
if (m_stopping_for_resync) {
Expand Down Expand Up @@ -1451,6 +1482,23 @@ void ImageReplayer<I>::handle_shut_down(int r) {
}
}

template <typename I>
void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
dout(20) << dendl;

cls::journal::Client client;
int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
if (r < 0) {
derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
return;
}

if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
dout(0) << "client flagged disconnected, stopping image replay" << dendl;
stop(nullptr, false, -ENOTCONN, "disconnected");
}
}

template <typename I>
std::string ImageReplayer<I>::to_string(const State state) {
switch (state) {
Expand Down

0 comments on commit be3571f

Please sign in to comment.