Skip to content

Commit

Permalink
Merge pull request #6742 from dillaman/wip-13913
Browse files Browse the repository at this point in the history
librbd: automatically flush IO after blocking write operations

Reviewed-by: Josh Durgin <jdurgin@redhat.com>
  • Loading branch information
jdurgin committed Dec 1, 2015
2 parents 2f36909 + 9b0e359 commit 18c5e18
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 45 deletions.
30 changes: 22 additions & 8 deletions src/librbd/AioImageRequestWQ.cc
Expand Up @@ -177,12 +177,13 @@ void AioImageRequestWQ::block_writes(Context *on_blocked) {
++m_write_blockers;
ldout(cct, 5) << __func__ << ": " << &m_image_ctx << ", "
<< "num=" << m_write_blockers << dendl;
if (m_in_progress_writes > 0) {
if (!m_write_blocker_contexts.empty() || m_in_progress_writes > 0) {
m_write_blocker_contexts.push_back(on_blocked);
return;
}
}
on_blocked->complete(0);

m_image_ctx.op_work_queue->queue(on_blocked);
}

void AioImageRequestWQ::unblock_writes() {
Expand Down Expand Up @@ -242,24 +243,25 @@ void AioImageRequestWQ::process(AioImageRequest *req) {
req->send();
}

Contexts contexts;
bool writes_blocked = false;
{
Mutex::Locker locker(m_lock);
if (req->is_write_op()) {
assert(m_queued_writes > 0);
--m_queued_writes;

assert(m_in_progress_writes > 0);
if (--m_in_progress_writes == 0) {
contexts.swap(m_write_blocker_contexts);
if (--m_in_progress_writes == 0 && !m_write_blocker_contexts.empty()) {
writes_blocked = true;
}
}
}
delete req;

for (Contexts::iterator it = contexts.begin(); it != contexts.end(); ++it) {
(*it)->complete(0);
if (writes_blocked) {
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
m_image_ctx.flush(new C_BlockedWrites(this));
}
delete req;
}

bool AioImageRequestWQ::is_journal_required() const {
Expand Down Expand Up @@ -325,4 +327,16 @@ void AioImageRequestWQ::handle_lock_updated(
}
}

void AioImageRequestWQ::handle_blocked_writes(int r) {
Contexts contexts;
{
Mutex::Locker locker(m_lock);
contexts.swap(m_write_blocker_contexts);
}

for (auto ctx : contexts) {
ctx->complete(0);
}
}

} // namespace librbd
12 changes: 12 additions & 0 deletions src/librbd/AioImageRequestWQ.h
Expand Up @@ -71,6 +71,17 @@ class AioImageRequestWQ : protected ThreadPool::PointerWQ<AioImageRequest> {
}
};

struct C_BlockedWrites : public Context {
AioImageRequestWQ *aio_work_queue;
C_BlockedWrites(AioImageRequestWQ *_aio_work_queue)
: aio_work_queue(_aio_work_queue) {
}

virtual void finish(int r) {
aio_work_queue->handle_blocked_writes(r);
}
};

ImageCtx &m_image_ctx;
mutable Mutex m_lock;
Contexts m_write_blocker_contexts;
Expand All @@ -86,6 +97,7 @@ class AioImageRequestWQ : protected ThreadPool::PointerWQ<AioImageRequest> {
void queue(AioImageRequest *req);

void handle_lock_updated(ImageWatcher::LockUpdateState state);
void handle_blocked_writes(int r);
};

} // namespace librbd
Expand Down
17 changes: 3 additions & 14 deletions src/librbd/ImageWatcher.cc
Expand Up @@ -2,6 +2,7 @@
// vim: ts=8 sw=2 smarttab
#include "librbd/ImageWatcher.h"
#include "librbd/AioCompletion.h"
#include "librbd/AioImageRequestWQ.h"
#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
#include "librbd/ObjectMap.h"
Expand Down Expand Up @@ -392,7 +393,6 @@ int ImageWatcher::release_lock()

// ensure all maint operations are canceled
m_image_ctx.cancel_async_requests();
m_image_ctx.flush_async_operations();

int r;
{
Expand All @@ -402,12 +402,8 @@ int ImageWatcher::release_lock()
// lock is being released
notify_listeners_updated_lock(LOCK_UPDATE_STATE_RELEASING);

RWLock::WLocker md_locker(m_image_ctx.md_lock);
r = m_image_ctx.flush();
if (r < 0) {
lderr(cct) << this << " failed to flush: " << cpp_strerror(r) << dendl;
goto err_cancel_unlock;
}
// AioImageRequestWQ will have blocked writes / flushed IO by this point
assert(m_image_ctx.aio_work_queue->writes_blocked());
}

m_image_ctx.owner_lock.get_write();
Expand All @@ -430,13 +426,6 @@ int ImageWatcher::release_lock()
}

return 0;

err_cancel_unlock:
m_image_ctx.owner_lock.get_write();
if (m_lock_owner_state == LOCK_OWNER_STATE_RELEASING) {
m_lock_owner_state = LOCK_OWNER_STATE_LOCKED;
}
return r;
}

void ImageWatcher::assert_header_locked(librados::ObjectWriteOperation *op) {
Expand Down
17 changes: 0 additions & 17 deletions src/librbd/operation/SnapshotCreateRequest.cc
Expand Up @@ -27,9 +27,6 @@ std::ostream& operator<<(std::ostream& os,
case SnapshotCreateRequest::STATE_SUSPEND_AIO:
os << "SUSPEND_AIO";
break;
case SnapshotCreateRequest::STATE_FLUSH_AIO:
os << "FLUSH_AIO";
break;
case SnapshotCreateRequest::STATE_ALLOCATE_SNAP_ID:
os << "ALLOCATE_SNAP_ID";
break;
Expand Down Expand Up @@ -86,9 +83,6 @@ bool SnapshotCreateRequest::should_complete(int r) {
send_suspend_aio();
break;
case STATE_SUSPEND_AIO:
send_flush_aio();
break;
case STATE_FLUSH_AIO:
send_allocate_snap_id();
break;
case STATE_ALLOCATE_SNAP_ID:
Expand Down Expand Up @@ -156,17 +150,6 @@ void SnapshotCreateRequest::send_suspend_aio() {
m_image_ctx.aio_work_queue->block_writes(create_async_callback_context());
}

void SnapshotCreateRequest::send_flush_aio() {
assert(m_image_ctx.owner_lock.is_locked());

CephContext *cct = m_image_ctx.cct;
ldout(cct, 5) << this << " " << __func__ << dendl;
m_state = STATE_FLUSH_AIO;

// can issue a re-entrant callback if no IO to flush
m_image_ctx.flush(create_async_callback_context());
}

void SnapshotCreateRequest::send_allocate_snap_id() {
assert(m_image_ctx.owner_lock.is_locked());

Expand Down
7 changes: 1 addition & 6 deletions src/librbd/operation/SnapshotCreateRequest.h
Expand Up @@ -30,10 +30,7 @@ class SnapshotCreateRequest : public Request {
* STATE_SUSPEND_REQUESTS
* |
* v
* STATE_SUSPEND_AIO
* |
* v
* STATE_FLUSH_AIO * * * * * * * * * * * * * *
* STATE_SUSPEND_AIO * * * * * * * * * * * * *
* | *
* (retry) v *
* . . . > STATE_ALLOCATE_SNAP_ID * * *
Expand Down Expand Up @@ -61,7 +58,6 @@ class SnapshotCreateRequest : public Request {
enum State {
STATE_SUSPEND_REQUESTS,
STATE_SUSPEND_AIO,
STATE_FLUSH_AIO,
STATE_ALLOCATE_SNAP_ID,
STATE_CREATE_SNAP,
STATE_CREATE_OBJECT_MAP,
Expand Down Expand Up @@ -112,7 +108,6 @@ class SnapshotCreateRequest : public Request {

void send_suspend_requests();
void send_suspend_aio();
void send_flush_aio();
void send_allocate_snap_id();
void send_create_snap();
bool send_create_object_map();
Expand Down

0 comments on commit 18c5e18

Please sign in to comment.