Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

librbd: automatically flush IO after blocking write operations #6742

Merged
merged 1 commit into from Dec 1, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 22 additions & 8 deletions src/librbd/AioImageRequestWQ.cc
Expand Up @@ -165,12 +165,13 @@ void AioImageRequestWQ::block_writes(Context *on_blocked) {
++m_write_blockers;
ldout(cct, 5) << __func__ << ": " << &m_image_ctx << ", "
<< "num=" << m_write_blockers << dendl;
if (m_in_progress_writes > 0) {
if (!m_write_blocker_contexts.empty() || m_in_progress_writes > 0) {
m_write_blocker_contexts.push_back(on_blocked);
return;
}
}
on_blocked->complete(0);

m_image_ctx.op_work_queue->queue(on_blocked);
}

void AioImageRequestWQ::unblock_writes() {
Expand Down Expand Up @@ -230,24 +231,25 @@ void AioImageRequestWQ::process(AioImageRequest *req) {
req->send();
}

Contexts contexts;
bool writes_blocked = false;
{
Mutex::Locker locker(m_lock);
if (req->is_write_op()) {
assert(m_queued_writes > 0);
--m_queued_writes;

assert(m_in_progress_writes > 0);
if (--m_in_progress_writes == 0) {
contexts.swap(m_write_blocker_contexts);
if (--m_in_progress_writes == 0 && !m_write_blocker_contexts.empty()) {
writes_blocked = true;
}
}
}
delete req;

for (Contexts::iterator it = contexts.begin(); it != contexts.end(); ++it) {
(*it)->complete(0);
if (writes_blocked) {
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
m_image_ctx.flush(new C_BlockedWrites(this));
}
delete req;
}

bool AioImageRequestWQ::is_journal_required() const {
Expand Down Expand Up @@ -313,4 +315,16 @@ void AioImageRequestWQ::handle_lock_updated(
}
}

void AioImageRequestWQ::handle_blocked_writes(int r) {
Contexts contexts;
{
Mutex::Locker locker(m_lock);
contexts.swap(m_write_blocker_contexts);
}

for (auto ctx : contexts) {
ctx->complete(0);
}
}

} // namespace librbd
12 changes: 12 additions & 0 deletions src/librbd/AioImageRequestWQ.h
Expand Up @@ -71,6 +71,17 @@ class AioImageRequestWQ : protected ThreadPool::PointerWQ<AioImageRequest> {
}
};

struct C_BlockedWrites : public Context {
AioImageRequestWQ *aio_work_queue;
C_BlockedWrites(AioImageRequestWQ *_aio_work_queue)
: aio_work_queue(_aio_work_queue) {
}

virtual void finish(int r) {
aio_work_queue->handle_blocked_writes(r);
}
};

ImageCtx &m_image_ctx;
mutable Mutex m_lock;
Contexts m_write_blocker_contexts;
Expand All @@ -86,6 +97,7 @@ class AioImageRequestWQ : protected ThreadPool::PointerWQ<AioImageRequest> {
void queue(AioImageRequest *req);

void handle_lock_updated(ImageWatcher::LockUpdateState state);
void handle_blocked_writes(int r);
};

} // namespace librbd
Expand Down
17 changes: 3 additions & 14 deletions src/librbd/ImageWatcher.cc
Expand Up @@ -2,6 +2,7 @@
// vim: ts=8 sw=2 smarttab
#include "librbd/ImageWatcher.h"
#include "librbd/AioCompletion.h"
#include "librbd/AioImageRequestWQ.h"
#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
#include "librbd/ObjectMap.h"
Expand Down Expand Up @@ -392,7 +393,6 @@ int ImageWatcher::release_lock()

// ensure all maint operations are canceled
m_image_ctx.cancel_async_requests();
m_image_ctx.flush_async_operations();

int r;
{
Expand All @@ -402,12 +402,8 @@ int ImageWatcher::release_lock()
// lock is being released
notify_listeners_updated_lock(LOCK_UPDATE_STATE_RELEASING);

RWLock::WLocker md_locker(m_image_ctx.md_lock);
r = m_image_ctx.flush();
if (r < 0) {
lderr(cct) << this << " failed to flush: " << cpp_strerror(r) << dendl;
goto err_cancel_unlock;
}
// AioImageRequestWQ will have blocked writes / flushed IO by this point
assert(m_image_ctx.aio_work_queue->writes_blocked());
}

m_image_ctx.owner_lock.get_write();
Expand All @@ -430,13 +426,6 @@ int ImageWatcher::release_lock()
}

return 0;

err_cancel_unlock:
m_image_ctx.owner_lock.get_write();
if (m_lock_owner_state == LOCK_OWNER_STATE_RELEASING) {
m_lock_owner_state = LOCK_OWNER_STATE_LOCKED;
}
return r;
}

void ImageWatcher::assert_header_locked(librados::ObjectWriteOperation *op) {
Expand Down
17 changes: 0 additions & 17 deletions src/librbd/operation/SnapshotCreateRequest.cc
Expand Up @@ -27,9 +27,6 @@ std::ostream& operator<<(std::ostream& os,
case SnapshotCreateRequest::STATE_SUSPEND_AIO:
os << "SUSPEND_AIO";
break;
case SnapshotCreateRequest::STATE_FLUSH_AIO:
os << "FLUSH_AIO";
break;
case SnapshotCreateRequest::STATE_ALLOCATE_SNAP_ID:
os << "ALLOCATE_SNAP_ID";
break;
Expand Down Expand Up @@ -86,9 +83,6 @@ bool SnapshotCreateRequest::should_complete(int r) {
send_suspend_aio();
break;
case STATE_SUSPEND_AIO:
send_flush_aio();
break;
case STATE_FLUSH_AIO:
send_allocate_snap_id();
break;
case STATE_ALLOCATE_SNAP_ID:
Expand Down Expand Up @@ -156,17 +150,6 @@ void SnapshotCreateRequest::send_suspend_aio() {
m_image_ctx.aio_work_queue->block_writes(create_async_callback_context());
}

void SnapshotCreateRequest::send_flush_aio() {
assert(m_image_ctx.owner_lock.is_locked());

CephContext *cct = m_image_ctx.cct;
ldout(cct, 5) << this << " " << __func__ << dendl;
m_state = STATE_FLUSH_AIO;

// can issue a re-entrant callback if no IO to flush
m_image_ctx.flush(create_async_callback_context());
}

void SnapshotCreateRequest::send_allocate_snap_id() {
assert(m_image_ctx.owner_lock.is_locked());

Expand Down
7 changes: 1 addition & 6 deletions src/librbd/operation/SnapshotCreateRequest.h
Expand Up @@ -30,10 +30,7 @@ class SnapshotCreateRequest : public Request {
* STATE_SUSPEND_REQUESTS
* |
* v
* STATE_SUSPEND_AIO
* |
* v
* STATE_FLUSH_AIO * * * * * * * * * * * * * *
* STATE_SUSPEND_AIO * * * * * * * * * * * * *
* | *
* (retry) v *
* . . . > STATE_ALLOCATE_SNAP_ID * * *
Expand Down Expand Up @@ -61,7 +58,6 @@ class SnapshotCreateRequest : public Request {
enum State {
STATE_SUSPEND_REQUESTS,
STATE_SUSPEND_AIO,
STATE_FLUSH_AIO,
STATE_ALLOCATE_SNAP_ID,
STATE_CREATE_SNAP,
STATE_CREATE_OBJECT_MAP,
Expand Down Expand Up @@ -112,7 +108,6 @@ class SnapshotCreateRequest : public Request {

void send_suspend_requests();
void send_suspend_aio();
void send_flush_aio();
void send_allocate_snap_id();
void send_create_snap();
bool send_create_object_map();
Expand Down