Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

librbd: optionally unregister "laggy" journal clients #10378

Merged
merged 8 commits into from Sep 7, 2016
63 changes: 63 additions & 0 deletions qa/workunits/rbd/rbd_mirror.sh
Expand Up @@ -256,4 +256,67 @@ wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position'
compare_images ${POOL} ${image}

testlog "TEST: client disconnect"
image=laggy
create_image ${CLUSTER2} ${POOL} ${image} 128 --journal-object-size 64K
write_image ${CLUSTER2} ${POOL} ${image} 10

testlog " - replay stopped after disconnect"
wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
disconnect_image ${CLUSTER2} ${POOL} ${image}
test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected'

testlog " - replay started after resync requested"
request_resync_image ${CLUSTER1} ${POOL} ${image}
wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted'
wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
compare_images ${POOL} ${image}

testlog " - disconnected after max_concurrent_object_sets reached"
admin_daemon ${CLUSTER1} rbd mirror stop ${POOL}/${image}
wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
set_image_meta ${CLUSTER2} ${POOL} ${image} \
conf_rbd_journal_max_concurrent_object_sets 1
write_image ${CLUSTER2} ${POOL} ${image} 20 16384
write_image ${CLUSTER2} ${POOL} ${image} 20 16384
test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
set_image_meta ${CLUSTER2} ${POOL} ${image} \
conf_rbd_journal_max_concurrent_object_sets 0

testlog " - replay is still stopped (disconnected) after restart"
admin_daemon ${CLUSTER1} rbd mirror start ${POOL}/${image}
wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected'

testlog " - replay started after resync requested"
request_resync_image ${CLUSTER1} ${POOL} ${image}
wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted'
wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
compare_images ${POOL} ${image}

testlog " - rbd_mirroring_resync_after_disconnect config option"
set_image_meta ${CLUSTER1} ${POOL} ${image} \
conf_rbd_mirroring_resync_after_disconnect true
disconnect_image ${CLUSTER2} ${POOL} ${image}
wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted'
wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
compare_images ${POOL} ${image}
set_image_meta ${CLUSTER1} ${POOL} ${image} \
conf_rbd_mirroring_resync_after_disconnect false
disconnect_image ${CLUSTER2} ${POOL} ${image}
test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected'

echo OK
38 changes: 34 additions & 4 deletions qa/workunits/rbd/rbd_mirror_helpers.sh
Expand Up @@ -418,7 +418,7 @@ get_position()
local status_log=${TEMPDIR}/${CLUSTER2}-${pool}-${image}.status
rbd --cluster ${cluster} -p ${pool} journal status --image ${image} |
tee ${status_log} >&2
sed -nEe 's/^.*\[id='"${id_regexp}"',.*positions=\[\[([^]]*)\],.*$/\1/p' \
sed -nEe 's/^.*\[id='"${id_regexp}"',.*positions=\[\[([^]]*)\],.*state=connected.*$/\1/p' \
${status_log}
}

Expand Down Expand Up @@ -488,13 +488,30 @@ test_status_in_pool_dir()
}

create_image()
{
local cluster=$1 ; shift
local pool=$1 ; shift
local image=$1 ; shift
local size=128

if [ -n "$1" ]; then
size=$1
shift
fi

rbd --cluster ${cluster} -p ${pool} create --size ${size} \
--image-feature layering,exclusive-lock,journaling $@ ${image}
}

set_image_meta()
{
local cluster=$1
local pool=$2
local image=$3
local key=$4
local val=$5

rbd --cluster ${cluster} -p ${pool} create --size 128 \
--image-feature layering,exclusive-lock,journaling ${image}
rbd --cluster ${cluster} -p ${pool} image-meta set ${image} $key $val
}

remove_image()
Expand Down Expand Up @@ -532,6 +549,16 @@ clone_image()
${clone_pool}/${clone_image} --image-feature layering,exclusive-lock,journaling
}

disconnect_image()
{
local cluster=$1
local pool=$2
local image=$3

rbd --cluster ${cluster} -p ${pool} journal client disconnect \
--image ${image}
}

create_snapshot()
{
local cluster=$1
Expand Down Expand Up @@ -614,9 +641,12 @@ write_image()
local pool=$2
local image=$3
local count=$4
local size=$5

test -n "${size}" || size=4096

rbd --cluster ${cluster} -p ${pool} bench-write ${image} \
--io-size 4096 --io-threads 1 --io-total $((4096 * count)) \
--io-size ${size} --io-threads 1 --io-total $((size * count)) \
--io-pattern rand
}

Expand Down
13 changes: 9 additions & 4 deletions src/cls/journal/cls_journal_client.cc
Expand Up @@ -296,13 +296,18 @@ void client_update_data(librados::ObjectWriteOperation *op,

int client_update_state(librados::IoCtx &ioctx, const std::string &oid,
const std::string &id, cls::journal::ClientState state) {
librados::ObjectWriteOperation op;
client_update_state(&op, id, state);
return ioctx.operate(oid, &op);
}

void client_update_state(librados::ObjectWriteOperation *op,
const std::string &id,
cls::journal::ClientState state) {
bufferlist bl;
::encode(id, bl);
::encode(static_cast<uint8_t>(state), bl);

librados::ObjectWriteOperation op;
op.exec("journal", "client_update_state", bl);
return ioctx.operate(oid, &op);
op->exec("journal", "client_update_state", bl);
}

int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
Expand Down
3 changes: 3 additions & 0 deletions src/cls/journal/cls_journal_client.h
Expand Up @@ -53,6 +53,9 @@ void client_update_data(librados::ObjectWriteOperation *op,
const std::string &id, const bufferlist &data);
int client_update_state(librados::IoCtx &ioctx, const std::string &oid,
const std::string &id, cls::journal::ClientState state);
void client_update_state(librados::ObjectWriteOperation *op,
const std::string &id,
cls::journal::ClientState state);

int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
const std::string &id);
Expand Down
2 changes: 2 additions & 0 deletions src/common/config_opts.h
Expand Up @@ -1212,6 +1212,7 @@ OPTION(rbd_tracing, OPT_BOOL, false) // true if LTTng-UST tracepoints should be
OPTION(rbd_validate_pool, OPT_BOOL, true) // true if empty pools should be validated for RBD compatibility
OPTION(rbd_validate_names, OPT_BOOL, true) // true if image specs should be validated
OPTION(rbd_auto_exclusive_lock_until_manual_request, OPT_BOOL, true) // whether to automatically acquire/release exclusive lock until it is explicitly requested, i.e. before we know the user of librbd is properly using the lock API
OPTION(rbd_mirroring_resync_after_disconnect, OPT_BOOL, false) // automatically start image resync after mirroring is disconnected due to being laggy

/*
* The following options change the behavior for librbd's image creation methods that
Expand Down Expand Up @@ -1252,6 +1253,7 @@ OPTION(rbd_journal_object_flush_bytes, OPT_INT, 0) // maximum number of pending
OPTION(rbd_journal_object_flush_age, OPT_DOUBLE, 0) // maximum age (in seconds) for pending commits
OPTION(rbd_journal_pool, OPT_STR, "") // pool for journal objects
OPTION(rbd_journal_max_payload_bytes, OPT_U32, 16384) // maximum journal payload size before splitting
OPTION(rbd_journal_max_concurrent_object_sets, OPT_INT, 0) // maximum number of object sets a journal client can be behind before it is automatically unregistered

/**
* RBD Mirror options
Expand Down
63 changes: 61 additions & 2 deletions src/journal/JournalMetadata.cc
Expand Up @@ -749,6 +749,10 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
Client client(m_client_id, bufferlist());
RegisteredClients::iterator it = refresh->registered_clients.find(client);
if (it != refresh->registered_clients.end()) {
if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) {
ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id
<< dendl;
}
m_minimum_set = MAX(m_minimum_set, refresh->minimum_set);
m_active_set = MAX(m_active_set, refresh->active_set);
m_registered_clients = refresh->registered_clients;
Expand Down Expand Up @@ -810,9 +814,11 @@ void JournalMetadata::handle_commit_position_task() {
librados::ObjectWriteOperation op;
client::client_commit(&op, m_client_id, m_commit_position);

C_NotifyUpdate *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
m_commit_position_ctx = NULL;

ctx = schedule_laggy_clients_disconnect(ctx);

librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
Expand All @@ -839,7 +845,7 @@ void JournalMetadata::handle_watch_reset() {
if (r == -ENOENT) {
ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl;
} else {
lderr(m_cct) << __func__ << ": failed to watch journal"
lderr(m_cct) << __func__ << ": failed to watch journal: "
<< cpp_strerror(r) << dendl;
}
schedule_watch_reset();
Expand Down Expand Up @@ -1023,6 +1029,59 @@ void JournalMetadata::handle_notified(int r) {
ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
}

Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
assert(m_lock.is_locked());

ldout(m_cct, 20) << __func__ << dendl;

if (m_settings.max_concurrent_object_sets <= 0) {
return on_finish;
}

Context *ctx = on_finish;

for (auto &c : m_registered_clients) {
if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
c.id == m_client_id ||
m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
continue;
}
const std::string &client_id = c.id;
uint64_t object_set = 0;
if (!c.commit_position.object_positions.empty()) {
auto &position = *(c.commit_position.object_positions.begin());
object_set = position.object_number / m_splay_width;
}

if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
ldout(m_cct, 1) << __func__ << ": " << client_id
<< ": scheduling disconnect" << dendl;

ctx = new FunctionContext([this, client_id, ctx](int r1) {
ldout(m_cct, 10) << __func__ << ": " << client_id
<< ": flagging disconnected" << dendl;

librados::ObjectWriteOperation op;
client::client_update_state(&op, client_id,
cls::journal::CLIENT_STATE_DISCONNECTED);

librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, nullptr,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
assert(r == 0);
comp->release();
});
}
}

if (ctx == on_finish) {
ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl;
}

return ctx;
}

std::ostream &operator<<(std::ostream &os,
const JournalMetadata::RegisteredClients &clients) {
os << "[";
Expand Down
2 changes: 2 additions & 0 deletions src/journal/JournalMetadata.h
Expand Up @@ -344,6 +344,8 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
void handle_watch_error(int err);
void handle_notified(int r);

Context *schedule_laggy_clients_disconnect(Context *on_finish);

friend std::ostream &operator<<(std::ostream &os,
const JournalMetadata &journal_metadata);
};
Expand Down
5 changes: 4 additions & 1 deletion src/journal/JournalTrimmer.cc
Expand Up @@ -141,8 +141,11 @@ void JournalTrimmer::handle_metadata_updated() {
uint64_t minimum_commit_set = active_set;
std::string minimum_client_id;

// TODO: add support for trimming past "laggy" clients
for (auto &client : registered_clients) {
if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
continue;
}

if (client.commit_position.object_positions.empty()) {
// client hasn't recorded any commits
minimum_commit_set = minimum_set;
Expand Down
3 changes: 3 additions & 0 deletions src/journal/Settings.h
Expand Up @@ -12,6 +12,9 @@ struct Settings {
double commit_interval = 5; ///< commit position throttle (in secs)
uint64_t max_fetch_bytes = 0; ///< 0 implies no limit
uint64_t max_payload_bytes = 0; ///< 0 implies object size limit
int max_concurrent_object_sets = 0; ///< 0 implies no limit
std::set<std::string> whitelisted_laggy_clients;
///< clients that mustn't be disconnected
};

} // namespace journal
Expand Down
6 changes: 5 additions & 1 deletion src/librbd/ImageCtx.cc
Expand Up @@ -944,7 +944,9 @@ struct C_InvalidateCache : public Context {
"rbd_journal_object_flush_bytes", false)(
"rbd_journal_object_flush_age", false)(
"rbd_journal_pool", false)(
"rbd_journal_max_payload_bytes", false);
"rbd_journal_max_payload_bytes", false)(
"rbd_journal_max_concurrent_object_sets", false)(
"rbd_mirroring_resync_after_disconnect", false);

md_config_t local_config_t;
std::map<std::string, bufferlist> res;
Expand Down Expand Up @@ -1000,6 +1002,8 @@ struct C_InvalidateCache : public Context {
ASSIGN_OPTION(journal_object_flush_age);
ASSIGN_OPTION(journal_pool);
ASSIGN_OPTION(journal_max_payload_bytes);
ASSIGN_OPTION(journal_max_concurrent_object_sets);
ASSIGN_OPTION(mirroring_resync_after_disconnect);
}

ExclusiveLock<ImageCtx> *ImageCtx::create_exclusive_lock() {
Expand Down
2 changes: 2 additions & 0 deletions src/librbd/ImageCtx.h
Expand Up @@ -188,6 +188,8 @@ namespace librbd {
double journal_object_flush_age;
std::string journal_pool;
uint32_t journal_max_payload_bytes;
int journal_max_concurrent_object_sets;
bool mirroring_resync_after_disconnect;

LibrbdAdminSocketHook *asok_hook;

Expand Down
5 changes: 5 additions & 0 deletions src/librbd/Journal.cc
Expand Up @@ -1141,6 +1141,11 @@ void Journal<I>::create_journaler() {
::journal::Settings settings;
settings.commit_interval = m_image_ctx.journal_commit_age;
settings.max_payload_bytes = m_image_ctx.journal_max_payload_bytes;
settings.max_concurrent_object_sets =
m_image_ctx.journal_max_concurrent_object_sets;
// TODO: a configurable filter to exclude certain peers from being
// disconnected.
settings.whitelisted_laggy_clients = {IMAGE_CLIENT_ID};

m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
m_image_ctx.md_ctx, m_image_ctx.id,
Expand Down
19 changes: 19 additions & 0 deletions src/test/cli/rbd/help.t
Expand Up @@ -36,6 +36,7 @@
import-diff Import an incremental diff.
info Show information about image size, striping,
etc.
journal client disconnect Flag image journal client as disconnected.
journal export Export image journal.
journal import Import image journal.
journal info Show information about image journal.
Expand Down Expand Up @@ -653,6 +654,24 @@
--format arg output format [plain, json, or xml]
--pretty-format pretty formatting (json and xml)

rbd help journal client disconnect
usage: rbd journal client disconnect [--pool <pool>] [--image <image>]
[--journal <journal>]
[--client-id <client-id>]
<journal-spec>

Flag image journal client as disconnected.

Positional arguments
<journal-spec> journal specification
(example: [<pool-name>/]<journal-name>)

Optional arguments
-p [ --pool ] arg pool name
--image arg image name
--journal arg journal name
--client-id arg client ID (or leave unspecified to disconnect all)

rbd help journal export
usage: rbd journal export [--pool <pool>] [--image <image>]
[--journal <journal>] [--path <path>] [--verbose]
Expand Down