Skip to content

Commit

Permalink
journal: flush commit positions should wait for refresh
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
  • Loading branch information
Jason Dillaman committed Feb 13, 2018
1 parent fef56ae commit a6eb075
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 60 deletions.
150 changes: 91 additions & 59 deletions src/journal/JournalMetadata.cc
Expand Up @@ -673,22 +673,17 @@ void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) {
void JournalMetadata::flush_commit_position() {
ldout(m_cct, 20) << __func__ << dendl;

Mutex::Locker timer_locker(*m_timer_lock);
Mutex::Locker locker(m_lock);
if (m_commit_position_ctx == nullptr) {
return;
}

cancel_commit_task();
handle_commit_position_task();
C_SaferCond ctx;
flush_commit_position(&ctx);
ctx.wait();
}

void JournalMetadata::flush_commit_position(Context *on_safe) {
ldout(m_cct, 20) << __func__ << dendl;

Mutex::Locker timer_locker(*m_timer_lock);
Mutex::Locker locker(m_lock);
if (m_commit_position_ctx == nullptr) {
if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) {
// nothing to flush
if (on_safe != nullptr) {
m_work_queue->queue(on_safe, 0);
Expand All @@ -697,9 +692,12 @@ void JournalMetadata::flush_commit_position(Context *on_safe) {
}

if (on_safe != nullptr) {
m_commit_position_ctx = new C_FlushCommitPosition(
m_commit_position_ctx, on_safe);
m_flush_commit_position_ctxs.push_back(on_safe);
}
if (m_commit_position_ctx == nullptr) {
return;
}

cancel_commit_task();
handle_commit_position_task();
}
Expand Down Expand Up @@ -807,7 +805,6 @@ void JournalMetadata::cancel_commit_task() {
assert(m_lock.is_locked());
assert(m_commit_position_ctx != nullptr);
assert(m_commit_position_task_ctx != nullptr);

m_timer->cancel_event(m_commit_position_task_ctx);
m_commit_position_task_ctx = NULL;
}
Expand All @@ -818,7 +815,7 @@ void JournalMetadata::schedule_commit_task() {
assert(m_timer_lock->is_locked());
assert(m_lock.is_locked());
assert(m_commit_position_ctx != nullptr);
if (m_commit_position_task_ctx == NULL) {
if (m_commit_position_task_ctx == nullptr) {
m_commit_position_task_ctx =
m_timer->add_event_after(m_settings.commit_interval,
new C_CommitPositionTask(this));
Expand All @@ -832,22 +829,51 @@ void JournalMetadata::handle_commit_position_task() {
<< "client_id=" << m_client_id << ", "
<< "commit_position=" << m_commit_position << dendl;

librados::ObjectWriteOperation op;
client::client_commit(&op, m_client_id, m_commit_position);
m_commit_position_task_ctx = nullptr;
Context* commit_position_ctx = nullptr;
std::swap(commit_position_ctx, m_commit_position_ctx);

Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
m_commit_position_ctx = NULL;
m_async_op_tracker.start_op();
++m_flush_commits_in_progress;

ctx = schedule_laggy_clients_disconnect(ctx);
Context* ctx = new FunctionContext([this, commit_position_ctx](int r) {
Contexts flush_commit_position_ctxs;
m_lock.Lock();
assert(m_flush_commits_in_progress > 0);
--m_flush_commits_in_progress;
if (m_flush_commits_in_progress == 0) {
std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs);
}
m_lock.Unlock();

librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
commit_position_ctx->complete(0);
for (auto ctx : flush_commit_position_ctxs) {
ctx->complete(0);
}
m_async_op_tracker.finish_op();
});
ctx = new C_NotifyUpdate(this, ctx);
ctx = new FunctionContext([this, ctx](int r) {
// manually kick of a refresh in case the notification is missed
// and ignore the next notification that we are about to send
m_lock.Lock();
++m_ignore_watch_notifies;
m_lock.Unlock();

refresh(ctx);
});
ctx = new FunctionContext([this, ctx](int r) {
schedule_laggy_clients_disconnect(ctx);
});

librados::ObjectWriteOperation op;
client::client_commit(&op, m_client_id, m_commit_position);

auto comp = librados::Rados::aio_create_completion(ctx, nullptr,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
assert(r == 0);
comp->release();

m_commit_position_task_ctx = NULL;
}

void JournalMetadata::schedule_watch_reset() {
Expand Down Expand Up @@ -884,6 +910,14 @@ void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {
bufferlist bl;
m_ioctx.notify_ack(m_oid, notify_id, cookie, bl);

{
Mutex::Locker locker(m_lock);
if (m_ignore_watch_notifies > 0) {
--m_ignore_watch_notifies;
return;
}
}

refresh(NULL);
}

Expand Down Expand Up @@ -1060,57 +1094,55 @@ void JournalMetadata::handle_notified(int r) {
ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
}

Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
assert(m_lock.is_locked());

void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
ldout(m_cct, 20) << __func__ << dendl;

if (m_settings.max_concurrent_object_sets <= 0) {
return on_finish;
on_finish->complete(0);
return;
}

Context *ctx = on_finish;
{
Mutex::Locker locker(m_lock);
for (auto &c : m_registered_clients) {
if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
c.id == m_client_id ||
m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
continue;
}
const std::string &client_id = c.id;
uint64_t object_set = 0;
if (!c.commit_position.object_positions.empty()) {
auto &position = *(c.commit_position.object_positions.begin());
object_set = position.object_number / m_splay_width;
}

for (auto &c : m_registered_clients) {
if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
c.id == m_client_id ||
m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
continue;
}
const std::string &client_id = c.id;
uint64_t object_set = 0;
if (!c.commit_position.object_positions.empty()) {
auto &position = *(c.commit_position.object_positions.begin());
object_set = position.object_number / m_splay_width;
}

if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
ldout(m_cct, 1) << __func__ << ": " << client_id
<< ": scheduling disconnect" << dendl;
if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
ldout(m_cct, 1) << __func__ << ": " << client_id
<< ": scheduling disconnect" << dendl;

ctx = new FunctionContext([this, client_id, ctx](int r1) {
ldout(m_cct, 10) << __func__ << ": " << client_id
<< ": flagging disconnected" << dendl;
ctx = new FunctionContext([this, client_id, ctx](int r1) {
ldout(m_cct, 10) << __func__ << ": " << client_id
<< ": flagging disconnected" << dendl;

librados::ObjectWriteOperation op;
client::client_update_state(&op, client_id,
cls::journal::CLIENT_STATE_DISCONNECTED);
librados::ObjectWriteOperation op;
client::client_update_state(
&op, client_id, cls::journal::CLIENT_STATE_DISCONNECTED);

librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, nullptr,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
assert(r == 0);
comp->release();
});
auto comp = librados::Rados::aio_create_completion(
ctx, nullptr, utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
assert(r == 0);
comp->release();
});
}
}
}

if (ctx == on_finish) {
ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl;
}

return ctx;
ctx->complete(0);
}

std::ostream &operator<<(std::ostream &os,
Expand Down
6 changes: 5 additions & 1 deletion src/journal/JournalMetadata.h
Expand Up @@ -333,6 +333,7 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
size_t m_update_notifications;
Cond m_update_cond;

size_t m_ignore_watch_notifies = 0;
size_t m_refreshes_in_progress = 0;
Contexts m_refresh_ctxs;

Expand All @@ -341,6 +342,9 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
Context *m_commit_position_ctx;
Context *m_commit_position_task_ctx;

size_t m_flush_commits_in_progress = 0;
Contexts m_flush_commit_position_ctxs;

AsyncOpTracker m_async_op_tracker;

void handle_immutable_metadata(int r, Context *on_init);
Expand All @@ -358,7 +362,7 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
void handle_watch_error(int err);
void handle_notified(int r);

Context *schedule_laggy_clients_disconnect(Context *on_finish);
void schedule_laggy_clients_disconnect(Context *on_finish);

friend std::ostream &operator<<(std::ostream &os,
const JournalMetadata &journal_metadata);
Expand Down

0 comments on commit a6eb075

Please sign in to comment.