Skip to content

Commit

Permalink
Merge pull request ceph#8809 from trociny/wip-rbd-mirror-asok-restart
Browse files Browse the repository at this point in the history
rbd-mirror: admin socket commands to start/stop/restart mirroring

Reviewed-by: Jason Dillaman <dillaman@redhat.com>
  • Loading branch information
Jason Dillaman committed May 3, 2016
2 parents 4175db6 + a22cf51 commit 2c88ef8
Show file tree
Hide file tree
Showing 7 changed files with 476 additions and 43 deletions.
80 changes: 68 additions & 12 deletions qa/workunits/rbd/rbd_mirror.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ daemon_pid_file()
fi
}

testlog()
{
echo $(date '+%F %T') $@ | tee -a "${TEMPDIR}/rbd-mirror.test.log"
}

setup()
{
local c
Expand Down Expand Up @@ -195,6 +200,16 @@ stop_mirror()
rm -f $(daemon_pid_file "${cluster}")
}

admin_daemon()
{
local cluster=$1 ; shift

local asok_file=$(daemon_asok_file "${cluster}" "${cluster}")
test -S "${asok_file}"

ceph --admin-daemon ${asok_file} $@
}

status()
{
local cluster daemon image
Expand Down Expand Up @@ -285,10 +300,7 @@ flush()
cmd="${cmd} ${POOL}/${image}"
fi

local asok_file=$(daemon_asok_file "${cluster}" "${cluster}")
test -S "${asok_file}"

ceph --admin-daemon ${asok_file} ${cmd}
admin_daemon "${cluster}" ${cmd}
}

test_image_replay_state()
Expand All @@ -298,11 +310,12 @@ test_image_replay_state()
local test_state=$3
local current_state=stopped

local asok_file=$(daemon_asok_file "${cluster}" "${cluster}")
test -S "${asok_file}"
admin_daemon "${cluster}" help |
fgrep "\"rbd mirror status ${POOL}/${image}\"" &&
admin_daemon "${cluster}" rbd mirror status ${POOL}/${image} |
grep -i 'state.*Replaying' &&
current_state=started

ceph --admin-daemon ${asok_file} help |
fgrep "\"rbd mirror status ${POOL}/${image}\"" && current_state=started
test "${test_state}" = "${current_state}"
}

Expand Down Expand Up @@ -471,7 +484,7 @@ set -xe

setup

echo "TEST: add image and test replay"
testlog "TEST: add image and test replay"
start_mirror ${CLUSTER1}
image=test
create_image ${CLUSTER2} ${image}
Expand All @@ -482,7 +495,7 @@ test_status_in_pool_dir ${CLUSTER1} ${image} 'up+replaying' 'master_position'
test_status_in_pool_dir ${CLUSTER2} ${image} 'down+unknown'
compare_images ${image}

echo "TEST: stop mirror, add image, start mirror and test replay"
testlog "TEST: stop mirror, add image, start mirror and test replay"
stop_mirror ${CLUSTER1}
image1=test1
create_image ${CLUSTER2} ${image1}
Expand All @@ -494,13 +507,56 @@ test_status_in_pool_dir ${CLUSTER1} ${image1} 'up+replaying' 'master_position'
test_status_in_pool_dir ${CLUSTER2} ${image1} 'down+unknown'
compare_images ${image1}

echo "TEST: test the first image is replaying after restart"
testlog "TEST: test the first image is replaying after restart"
write_image ${CLUSTER2} ${image} 100
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${image}
test_status_in_pool_dir ${CLUSTER1} ${image} 'up+replaying' 'master_position'
compare_images ${image}

echo "TEST: failover and failback"
testlog "TEST: stop/start/restart mirror via admin socket"
admin_daemon ${CLUSTER1} rbd mirror stop
wait_for_image_replay_stopped ${CLUSTER1} ${image}
wait_for_image_replay_stopped ${CLUSTER1} ${image1}

admin_daemon ${CLUSTER1} rbd mirror start
wait_for_image_replay_started ${CLUSTER1} ${image}
wait_for_image_replay_started ${CLUSTER1} ${image1}

admin_daemon ${CLUSTER1} rbd mirror restart
wait_for_image_replay_started ${CLUSTER1} ${image}
wait_for_image_replay_started ${CLUSTER1} ${image1}

admin_daemon ${CLUSTER1} rbd mirror stop
wait_for_image_replay_stopped ${CLUSTER1} ${image}
wait_for_image_replay_stopped ${CLUSTER1} ${image1}

admin_daemon ${CLUSTER1} rbd mirror restart
wait_for_image_replay_started ${CLUSTER1} ${image}
wait_for_image_replay_started ${CLUSTER1} ${image1}

admin_daemon ${CLUSTER1} rbd mirror stop ${CLUSTER2}
wait_for_image_replay_stopped ${CLUSTER1} ${image}
wait_for_image_replay_stopped ${CLUSTER1} ${image1}

admin_daemon ${CLUSTER1} rbd mirror start ${POOL}/${image}
wait_for_image_replay_started ${CLUSTER1} ${image}

admin_daemon ${CLUSTER1} rbd mirror start
wait_for_image_replay_started ${CLUSTER1} ${image1}

admin_daemon ${CLUSTER1} rbd mirror start ${CLUSTER2}

admin_daemon ${CLUSTER1} rbd mirror restart ${POOL}/${image}
wait_for_image_replay_started ${CLUSTER1} ${image}

admin_daemon ${CLUSTER1} rbd mirror restart ${CLUSTER2}
wait_for_image_replay_started ${CLUSTER1} ${image}
wait_for_image_replay_started ${CLUSTER1} ${image1}

admin_daemon ${CLUSTER1} rbd mirror flush
admin_daemon ${CLUSTER1} rbd mirror status

testlog "TEST: failover and failback"
start_mirror ${CLUSTER2}

# failover
Expand Down
178 changes: 153 additions & 25 deletions src/tools/rbd_mirror/ImageReplayer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,48 @@ class StatusCommand : public ImageReplayerAdminSocketCommand {
ImageReplayer<I> *replayer;
};

template <typename I>
class StartCommand : public ImageReplayerAdminSocketCommand {
public:
explicit StartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}

bool call(Formatter *f, stringstream *ss) {
replayer->start(nullptr, nullptr, true);
return true;
}

private:
ImageReplayer<I> *replayer;
};

template <typename I>
class StopCommand : public ImageReplayerAdminSocketCommand {
public:
explicit StopCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}

bool call(Formatter *f, stringstream *ss) {
replayer->stop(nullptr, true);
return true;
}

private:
ImageReplayer<I> *replayer;
};

template <typename I>
class RestartCommand : public ImageReplayerAdminSocketCommand {
public:
explicit RestartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}

bool call(Formatter *f, stringstream *ss) {
replayer->restart();
return true;
}

private:
ImageReplayer<I> *replayer;
};

template <typename I>
class FlushCommand : public ImageReplayerAdminSocketCommand {
public:
Expand Down Expand Up @@ -122,6 +164,27 @@ class ImageReplayerAdminSocketHook : public AdminSocketHook {
commands[command] = new StatusCommand<I>(replayer);
}

command = "rbd mirror start " + name;
r = admin_socket->register_command(command, command, this,
"start rbd mirror " + name);
if (r == 0) {
commands[command] = new StartCommand<I>(replayer);
}

command = "rbd mirror stop " + name;
r = admin_socket->register_command(command, command, this,
"stop rbd mirror " + name);
if (r == 0) {
commands[command] = new StopCommand<I>(replayer);
}

command = "rbd mirror restart " + name;
r = admin_socket->register_command(command, command, this,
"restart rbd mirror " + name);
if (r == 0) {
commands[command] = new RestartCommand<I>(replayer);
}

command = "rbd mirror flush " + name;
r = admin_socket->register_command(command, command, this,
"flush rbd mirror " + name);
Expand Down Expand Up @@ -197,6 +260,21 @@ ImageReplayer<I>::ImageReplayer(Threads *threads, RadosRef local, RadosRef remot
remote_image_id),
m_progress_cxt(this)
{
// Register asok commands using a temporary "remote_pool_name/global_image_id"
// name. When the image name becomes known on start the asok commands will be
// re-registered using "remote_pool_name/remote_image_name" name.

std::string pool_name;
int r = m_remote->pool_reverse_lookup(m_remote_pool_id, &pool_name);
if (r < 0) {
derr << "error resolving remote pool " << m_remote_pool_id
<< ": " << cpp_strerror(r) << dendl;
pool_name = stringify(m_remote_pool_id);
}
m_name = pool_name + "/" + m_global_image_id;

CephContext *cct = static_cast<CephContext *>(m_local->cct());
m_asok_hook = new ImageReplayerAdminSocketHook<I>(cct, m_name, this);
}

template <typename I>
Expand Down Expand Up @@ -224,23 +302,41 @@ void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {

template <typename I>
void ImageReplayer<I>::start(Context *on_finish,
const BootstrapParams *bootstrap_params)
const BootstrapParams *bootstrap_params,
bool manual)
{
assert(m_on_start_finish == nullptr);
assert(m_on_stop_finish == nullptr);
dout(20) << "on_finish=" << on_finish << dendl;

int r = 0;
{
Mutex::Locker locker(m_lock);
assert(is_stopped_());

m_state = STATE_STARTING;
m_last_r = 0;
m_state_desc.clear();
m_on_start_finish = on_finish;
if (!is_stopped_()) {
derr << "already running" << dendl;
r = -EINVAL;
} else if (m_manual_stop && !manual) {
dout(5) << "stopped manually, ignoring start without manual flag"
<< dendl;
r = -EPERM;
} else {
m_state = STATE_STARTING;
m_last_r = 0;
m_state_desc.clear();
m_on_start_finish = on_finish;
m_manual_stop = false;
}
}

if (r < 0) {
if (on_finish) {
on_finish->complete(r);
}
return;
}

int r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx);
r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx);
if (r < 0) {
derr << "error opening ioctx for remote pool " << m_remote_pool_id
<< ": " << cpp_strerror(r) << dendl;
Expand Down Expand Up @@ -309,11 +405,21 @@ void ImageReplayer<I>::handle_bootstrap(int r) {

{
Mutex::Locker locker(m_lock);
m_name = m_local_ioctx.get_pool_name() + "/" + m_local_image_ctx->name;

CephContext *cct = static_cast<CephContext *>(m_local->cct());
delete m_asok_hook;
m_asok_hook = new ImageReplayerAdminSocketHook<I>(cct, m_name, this);
std::string name = m_local_ioctx.get_pool_name() + "/" +
m_local_image_ctx->name;
if (m_name != name) {
m_name = name;
if (m_asok_hook) {
// Re-register asok commands using the new name.
delete m_asok_hook;
m_asok_hook = nullptr;
}
}
if (!m_asok_hook) {
CephContext *cct = static_cast<CephContext *>(m_local->cct());
m_asok_hook = new ImageReplayerAdminSocketHook<I>(cct, m_name, this);
}
}

update_mirror_image_status();
Expand Down Expand Up @@ -483,27 +589,39 @@ bool ImageReplayer<I>::on_start_interrupted()
}

template <typename I>
void ImageReplayer<I>::stop(Context *on_finish)
void ImageReplayer<I>::stop(Context *on_finish, bool manual)
{
dout(20) << "on_finish=" << on_finish << dendl;

bool shut_down_replay = false;
bool running = true;
{
Mutex::Locker locker(m_lock);
assert(is_running_());
if (!is_running_()) {
running = false;
} else {
if (!is_stopped_()) {
if (m_state == STATE_STARTING) {
dout(20) << "interrupting start" << dendl;
} else {
dout(20) << "interrupting replay" << dendl;
shut_down_replay = true;
}

if (!is_stopped_()) {
if (m_state == STATE_STARTING) {
dout(20) << "interrupting start" << dendl;
} else {
dout(20) << "interrupting replay" << dendl;
shut_down_replay = true;
assert(m_on_stop_finish == nullptr);
std::swap(m_on_stop_finish, on_finish);
m_stop_requested = true;
m_manual_stop = manual;
}
}
}

assert(m_on_stop_finish == nullptr);
std::swap(m_on_stop_finish, on_finish);
m_stop_requested = true;
if (!running) {
derr << "not running" << dendl;
if (on_finish) {
on_finish->complete(-EINVAL);
}
return;
}

if (shut_down_replay) {
Expand Down Expand Up @@ -596,9 +714,6 @@ void ImageReplayer<I>::on_stop_local_image_close_finish(int r)

m_remote_ioctx.close();

delete m_asok_hook;
m_asok_hook = nullptr;

Context *on_finish(nullptr);

{
Expand Down Expand Up @@ -645,6 +760,19 @@ void ImageReplayer<I>::handle_replay_ready()
replay_flush();
}

template <typename I>
void ImageReplayer<I>::restart(Context *on_finish)
{
FunctionContext *ctx = new FunctionContext(
[this, on_finish](int r) {
if (r < 0) {
// Try start anyway.
}
start(on_finish, nullptr, true);
});
stop(ctx);
}

template <typename I>
void ImageReplayer<I>::flush(Context *on_finish)
{
Expand Down

0 comments on commit 2c88ef8

Please sign in to comment.