Skip to content

Commit

Permalink
rbd-mirror: re-register remote journal if client flagged disconnected
Browse files Browse the repository at this point in the history
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
  • Loading branch information
Mykola Golub committed Aug 1, 2016
1 parent 5d277f1 commit 7706e6c
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 8 deletions.
21 changes: 21 additions & 0 deletions qa/workunits/rbd/rbd_mirror.sh
Expand Up @@ -90,6 +90,27 @@ wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image1}
admin_daemon ${CLUSTER1} rbd mirror flush
admin_daemon ${CLUSTER1} rbd mirror status

testlog "TEST: unregister laggy client"
image1=laggy
create_image ${CLUSTER2} ${POOL} ${image1} --journal-object-size 64K
set_image_meta ${CLUSTER2} ${POOL} ${image1} \
conf_rbd_journal_max_concurrent_object_sets 1
wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image1}
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image1}

admin_daemon ${CLUSTER1} rbd mirror stop ${POOL}/${image1}
wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image1}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image1})"
write_image ${CLUSTER2} ${POOL} ${image1} 20 16384
write_image ${CLUSTER2} ${POOL} ${image1} 20 16384
test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image1})"

admin_daemon ${CLUSTER1} rbd mirror start ${POOL}/${image1}
wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image1}
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image1}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image1})"
compare_images ${POOL} ${image1}

testlog "TEST: failover and failback"
start_mirror ${CLUSTER2}

Expand Down
22 changes: 18 additions & 4 deletions qa/workunits/rbd/rbd_mirror_helpers.sh
Expand Up @@ -418,7 +418,7 @@ get_position()
local status_log=${TEMPDIR}/${CLUSTER2}-${pool}-${image}.status
rbd --cluster ${cluster} -p ${pool} journal status --image ${image} |
tee ${status_log} >&2
sed -nEe 's/^.*\[id='"${id_regexp}"',.*positions=\[\[([^]]*)\],.*$/\1/p' \
sed -nEe 's/^.*\[id='"${id_regexp}"',.*positions=\[\[([^]]*)\],.*state=connected.*$/\1/p' \
${status_log}
}

Expand Down Expand Up @@ -488,13 +488,24 @@ test_status_in_pool_dir()
}

create_image()
{
local cluster=$1 ; shift
local pool=$1 ; shift
local image=$1 ; shift

rbd --cluster ${cluster} -p ${pool} create --size 128 \
--image-feature layering,exclusive-lock,journaling $@ ${image}
}

set_image_meta()
{
local cluster=$1
local pool=$2
local image=$3
local key=$4
local val=$5

rbd --cluster ${cluster} -p ${pool} create --size 128 \
--image-feature layering,exclusive-lock,journaling ${image}
rbd --cluster ${cluster} -p ${pool} image-meta set ${image1} $key $val
}

remove_image()
Expand Down Expand Up @@ -614,9 +625,12 @@ write_image()
local pool=$2
local image=$3
local count=$4
local size=$5

test -n "${size}" || size=4096

rbd --cluster ${cluster} -p ${pool} bench-write ${image} \
--io-size 4096 --io-threads 1 --io-total $((4096 * count)) \
--io-size ${size} --io-threads 1 --io-total $((size * count)) \
--io-pattern rand
}

Expand Down
8 changes: 5 additions & 3 deletions src/test/journal/mock/MockJournaler.h
Expand Up @@ -102,6 +102,7 @@ struct MockJournaler {
Context*));

MOCK_METHOD2(register_client, void(const bufferlist &, Context *));
MOCK_METHOD1(unregister_client, void(Context *));
MOCK_METHOD3(get_client, void(const std::string &, cls::journal::Client *,
Context *));
MOCK_METHOD2(get_cached_client, int(const std::string&, cls::journal::Client*));
Expand Down Expand Up @@ -159,9 +160,6 @@ struct MockJournalerProxy {
int register_client(const bufferlist &data) {
return -EINVAL;
}
void unregister_client(Context *ctx) {
ctx->complete(-EINVAL);
}

void allocate_tag(uint64_t, const bufferlist &,
cls::journal::Tag*, Context *on_finish) {
Expand Down Expand Up @@ -196,6 +194,10 @@ struct MockJournalerProxy {
MockJournaler::get_instance().register_client(data, on_finish);
}

void unregister_client(Context *on_finish) {
MockJournaler::get_instance().unregister_client(on_finish);
}

void get_client(const std::string &client_id, cls::journal::Client *client,
Context *on_finish) {
MockJournaler::get_instance().get_client(client_id, client, on_finish);
Expand Down
52 changes: 51 additions & 1 deletion src/tools/rbd_mirror/ImageReplayer.cc
Expand Up @@ -251,6 +251,15 @@ void ImageReplayer<I>::BootstrapProgressContext::update_progress(
}
}

template <typename I>
void ImageReplayer<I>::RemoteJournalerListener::handle_update(
::journal::JournalMetadata *) {
FunctionContext *ctx = new FunctionContext([this](int r) {
replayer->handle_remote_journal_metadata_updated();
});
replayer->m_threads->work_queue->queue(ctx, 0);
}

template <typename I>
ImageReplayer<I>::ImageReplayer(Threads *threads,
shared_ptr<ImageDeleter> image_deleter,
Expand All @@ -277,7 +286,8 @@ ImageReplayer<I>::ImageReplayer(Threads *threads,
m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " +
remote_image_id),
m_progress_cxt(this),
m_resync_listener(new ResyncListener<I>(this))
m_resync_listener(new ResyncListener<I>(this)),
m_remote_listener(this)
{
// Register asok commands using a temporary "remote_pool_name/global_image_id"
// name. When the image name becomes known on start the asok commands will be
Expand Down Expand Up @@ -500,6 +510,26 @@ void ImageReplayer<I>::handle_init_remote_journaler(int r) {
return;
}

m_remote_journaler->add_listener(&m_remote_listener);

cls::journal::Client client;
r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
if (r < 0) {
derr << "error retrieving remote journal client: " << cpp_strerror(r)
<< dendl;
on_start_fail(r, "error retrieving remote journal client");
return;
}

dout(20) << "client=" << client << dendl;

if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
dout(0) << "client flagged disconnected, stopping image replay" << dendl;
// XXX: Schedule re-sync after configurable interval?
on_start_fail(0, "client flagged disconnected");
return;
}

start_replay();
}

Expand Down Expand Up @@ -1339,6 +1369,7 @@ void ImageReplayer<I>::shut_down(int r, Context *on_start) {
ctx->complete(0);
});
ctx = new FunctionContext([this, ctx](int r) {
m_remote_journaler->remove_listener(&m_remote_listener);
m_remote_journaler->shut_down(ctx);
});
if (m_stopping_for_resync) {
Expand Down Expand Up @@ -1436,6 +1467,25 @@ void ImageReplayer<I>::handle_shut_down(int r, Context *on_start) {
}
}

template <typename I>
void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
dout(20) << dendl;

cls::journal::Client client;
int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
if (r < 0) {
derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
return;
}

dout(20) << "client=" << client << dendl;

if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
dout(0) << "client flagged disconnected, stopping image replay" << dendl;
stop(); // XXX: Schedule re-sync after configurable interval?
}
}

template <typename I>
std::string ImageReplayer<I>::to_string(const State state) {
switch (state) {
Expand Down
11 changes: 11 additions & 0 deletions src/tools/rbd_mirror/ImageReplayer.h
Expand Up @@ -14,6 +14,7 @@
#include "include/rados/librados.hpp"
#include "cls/journal/cls_journal_types.h"
#include "cls/rbd/cls_rbd_types.h"
#include "journal/JournalMetadataListener.h"
#include "journal/ReplayEntry.h"
#include "librbd/ImageCtx.h"
#include "librbd/journal/Types.h"
Expand Down Expand Up @@ -270,6 +271,14 @@ class ImageReplayer {
librbd::journal::TagData m_replay_tag_data;
librbd::journal::EventEntry m_event_entry;

struct RemoteJournalerListener : public ::journal::JournalMetadataListener {
ImageReplayer *replayer;

RemoteJournalerListener(ImageReplayer *replayer) : replayer(replayer) { }

void handle_update(::journal::JournalMetadata *);
} m_remote_listener;

struct C_ReplayCommitted : public Context {
ImageReplayer *replayer;
ReplayEntry replay_entry;
Expand Down Expand Up @@ -306,6 +315,8 @@ class ImageReplayer {
void handle_mirror_status_update(int r);
void reschedule_update_status_task(int new_interval = 0);

void handle_remote_journal_metadata_updated();

void shut_down(int r, Context *on_start);
void handle_shut_down(int r, Context *on_start);

Expand Down
30 changes: 30 additions & 0 deletions src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc
Expand Up @@ -194,6 +194,10 @@ void BootstrapRequest<I>::handle_get_client(int r) {
derr << ": failed to retrieve client: " << cpp_strerror(r) << dendl;
finish(r);
return;
} else if (m_client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
dout(0) << ": client flagged disconnected, re-registering" << dendl;
unregister_client();
return;
} else if (decode_client_meta()) {
// skip registration if it already exists
open_remote_image();
Expand All @@ -203,6 +207,32 @@ void BootstrapRequest<I>::handle_get_client(int r) {
register_client();
}

template <typename I>
void BootstrapRequest<I>::unregister_client() {
dout(20) << dendl;

update_progress("UNREGISTER_CLIENT");

Context *ctx = create_context_callback<
BootstrapRequest<I>, &BootstrapRequest<I>::handle_unregister_client>(
this);
m_journaler->unregister_client(ctx);
}

template <typename I>
void BootstrapRequest<I>::handle_unregister_client(int r) {
dout(20) << ": r=" << r << dendl;

if (r < 0) {
derr << ": failed to unregister with remote journal: " << cpp_strerror(r)
<< dendl;
finish(r);
return;
}

register_client();
}

template <typename I>
void BootstrapRequest<I>::register_client() {
dout(20) << dendl;
Expand Down
6 changes: 6 additions & 0 deletions src/tools/rbd_mirror/image_replayer/BootstrapRequest.h
Expand Up @@ -95,6 +95,9 @@ class BootstrapRequest : public BaseRequest {
* GET_CLIENT * * * * * * * * * * * * * * * *
* | *
* v (skip if not needed) * (error)
* UNREGISTER_CLIENT * * * * * * * * * * * *
* | *
* v (skip if not needed) * (error)
* REGISTER_CLIENT * * * * * * * * * * * * *
* | *
* v *
Expand Down Expand Up @@ -173,6 +176,9 @@ class BootstrapRequest : public BaseRequest {
void get_client();
void handle_get_client(int r);

void unregister_client();
void handle_unregister_client(int r);

void register_client();
void handle_register_client(int r);

Expand Down

0 comments on commit 7706e6c

Please sign in to comment.