Skip to content

Commit

Permalink
Merge pull request #7484: librbd: flattening an rbd image with active…
Browse files Browse the repository at this point in the history
… IO can lead to hang

Reviewed-by: Loic Dachary <ldachary@redhat.com>
  • Loading branch information
Loic Dachary committed Feb 8, 2016
2 parents 8ecb48f + 3b4a5f9 commit 5ff4eef
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 104 deletions.
20 changes: 19 additions & 1 deletion src/common/WorkQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,20 +354,33 @@ class ThreadPool : public md_config_obs_t {
class PointerWQ : public WorkQueue_ {
public:
PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
: WorkQueue_(n, ti, sti), m_pool(p) {
: WorkQueue_(n, ti, sti), m_pool(p), m_processing(0) {
m_pool->add_work_queue(this);
}
~PointerWQ() {
m_pool->remove_work_queue(this);
assert(m_processing == 0);
}
void drain() {
{
// if this queue is empty and not processing, don't wait for other
// queues to finish processing
Mutex::Locker l(m_pool->_lock);
if (m_processing == 0 && m_items.empty()) {
return;
}
}
m_pool->drain(this);
}
void queue(T *item) {
Mutex::Locker l(m_pool->_lock);
m_items.push_back(item);
m_pool->_cond.SignalOne();
}
bool empty() {
Mutex::Locker l(m_pool->_lock);
return _empty();
}
protected:
virtual void _clear() {
assert(m_pool->_lock.is_locked());
Expand All @@ -383,6 +396,7 @@ class ThreadPool : public md_config_obs_t {
return NULL;
}

++m_processing;
T *item = m_items.front();
m_items.pop_front();
return item;
Expand All @@ -391,6 +405,9 @@ class ThreadPool : public md_config_obs_t {
process(reinterpret_cast<T *>(item));
}
virtual void _void_process_finish(void *item) {
assert(m_pool->_lock.is_locked());
assert(m_processing > 0);
--m_processing;
}

virtual void process(T *item) = 0;
Expand All @@ -409,6 +426,7 @@ class ThreadPool : public md_config_obs_t {
private:
ThreadPool *m_pool;
std::list<T *> m_items;
uint32_t m_processing;
};
private:
vector<WorkQueue_*> work_queues;
Expand Down
1 change: 1 addition & 0 deletions src/librbd/AsyncOperation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "librbd/AsyncOperation.h"
#include "librbd/ImageCtx.h"
#include "common/dout.h"
#include "common/WorkQueue.h"
#include "include/assert.h"

#define dout_subsys ceph_subsys_rbd
Expand Down
196 changes: 135 additions & 61 deletions src/librbd/ImageCtx.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,89 @@ class ThreadPoolSingleton : public ThreadPool {
}
};

struct C_FlushCache : public Context {
ImageCtx *image_ctx;
Context *on_safe;

C_FlushCache(ImageCtx *_image_ctx, Context *_on_safe)
: image_ctx(_image_ctx), on_safe(_on_safe) {
}
virtual void finish(int r) {
// successful cache flush indicates all IO is now safe
RWLock::RLocker owner_locker(image_ctx->owner_lock);
image_ctx->flush_cache(on_safe);
}
};

struct C_InvalidateCache : public Context {
ImageCtx *image_ctx;
bool purge_on_error;
bool reentrant_safe;
Context *on_finish;

C_InvalidateCache(ImageCtx *_image_ctx, bool _purge_on_error,
bool _reentrant_safe, Context *_on_finish)
: image_ctx(_image_ctx), purge_on_error(_purge_on_error),
reentrant_safe(_reentrant_safe), on_finish(_on_finish) {
}
virtual void finish(int r) {
assert(image_ctx->cache_lock.is_locked());
CephContext *cct = image_ctx->cct;

if (r == -EBLACKLISTED) {
lderr(cct) << "Blacklisted during flush! Purging cache..." << dendl;
image_ctx->object_cacher->purge_set(image_ctx->object_set);
} else if (r != 0 && purge_on_error) {
lderr(cct) << "invalidate cache encountered error "
<< cpp_strerror(r) << " !Purging cache..." << dendl;
image_ctx->object_cacher->purge_set(image_ctx->object_set);
} else if (r != 0) {
lderr(cct) << "flush_cache returned " << r << dendl;
}

loff_t unclean = image_ctx->object_cacher->release_set(
image_ctx->object_set);
if (unclean == 0) {
r = 0;
} else {
lderr(cct) << "could not release all objects from cache: "
<< unclean << " bytes remain" << dendl;
r = -EBUSY;
}

if (reentrant_safe) {
on_finish->complete(r);
} else {
image_ctx->op_work_queue->queue(on_finish, r);
}
}

};

struct C_AsyncCallback : public Context {
ImageCtx *image_ctx;
Context *on_finish;
C_AsyncCallback(ImageCtx *image_ctx, Context *on_finish)
: image_ctx(image_ctx), on_finish(on_finish) {
}
virtual void finish(int r) {
image_ctx->op_work_queue->queue(on_finish, r);
}
};

void _flush_async_operations(ImageCtx *ictx, Context *on_finish) {
{
Mutex::Locker async_ops_locker(ictx->async_ops_lock);
if (!ictx->async_ops.empty()) {
ldout(ictx->cct, 20) << "flush async operations: " << on_finish << " "
<< "count=" << ictx->async_ops.size() << dendl;
ictx->async_ops.front()->add_flush_context(on_finish);
return;
}
}
on_finish->complete(0);
}

} // anonymous namespace

const string ImageCtx::METADATA_CONF_PREFIX = "conf_";
Expand Down Expand Up @@ -125,6 +208,11 @@ class ThreadPoolSingleton : public ThreadPool {
}
delete[] format_string;

md_ctx.aio_flush();
data_ctx.aio_flush();
op_work_queue->drain();
aio_work_queue->drain();

delete op_work_queue;
delete aio_work_queue;
}
Expand Down Expand Up @@ -647,30 +735,24 @@ class ThreadPoolSingleton : public ThreadPool {
}
}

void ImageCtx::flush_cache_aio(Context *onfinish) {
int ImageCtx::flush_cache() {
C_SaferCond cond_ctx;
flush_cache(&cond_ctx);

ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
int r = cond_ctx.wait();
ldout(cct, 20) << "finished flushing cache" << dendl;

return r;
}

void ImageCtx::flush_cache(Context *onfinish) {
assert(owner_lock.is_locked());
cache_lock.Lock();
object_cacher->flush_set(object_set, onfinish);
cache_lock.Unlock();
}

int ImageCtx::flush_cache() {
int r = 0;
Mutex mylock("librbd::ImageCtx::flush_cache");
Cond cond;
bool done;
Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
flush_cache_aio(onfinish);
mylock.Lock();
while (!done) {
ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
cond.Wait(mylock);
}
mylock.Unlock();
ldout(cct, 20) << "finished flushing cache" << dendl;
return r;
}

int ImageCtx::shutdown_cache() {
flush_async_operations();

Expand All @@ -681,20 +763,19 @@ class ThreadPoolSingleton : public ThreadPool {
}

int ImageCtx::invalidate_cache(bool purge_on_error) {
int result;
C_SaferCond ctx;
invalidate_cache(&ctx);
result = ctx.wait();

if (result && purge_on_error) {
cache_lock.Lock();
if (object_cacher != NULL) {
lderr(cct) << "invalidate cache met error " << cpp_strerror(result) << " !Purging cache..." << dendl;
object_cacher->purge_set(object_set);
}
cache_lock.Unlock();
flush_async_operations();
if (object_cacher == NULL) {
return 0;
}

cache_lock.Lock();
object_cacher->release_set(object_set);
cache_lock.Unlock();

C_SaferCond ctx;
flush_cache(new C_InvalidateCache(this, purge_on_error, true, &ctx));

int result = ctx.wait();
return result;
}

Expand All @@ -708,29 +789,7 @@ class ThreadPoolSingleton : public ThreadPool {
object_cacher->release_set(object_set);
cache_lock.Unlock();

flush_cache_aio(new FunctionContext(boost::bind(
&ImageCtx::invalidate_cache_completion, this, _1, on_finish)));
}

void ImageCtx::invalidate_cache_completion(int r, Context *on_finish) {
assert(cache_lock.is_locked());
if (r == -EBLACKLISTED) {
lderr(cct) << "Blacklisted during flush! Purging cache..." << dendl;
object_cacher->purge_set(object_set);
} else if (r != 0) {
lderr(cct) << "flush_cache returned " << r << dendl;
}

loff_t unclean = object_cacher->release_set(object_set);
if (unclean == 0) {
r = 0;
} else {
lderr(cct) << "could not release all objects from cache: "
<< unclean << " bytes remain" << dendl;
r = -EBUSY;
}

op_work_queue->queue(on_finish, r);
flush_cache(new C_InvalidateCache(this, false, false, on_finish));
}

void ImageCtx::clear_nonexistence_cache() {
Expand Down Expand Up @@ -777,20 +836,35 @@ class ThreadPoolSingleton : public ThreadPool {

void ImageCtx::flush_async_operations() {
C_SaferCond ctx;
flush_async_operations(&ctx);
_flush_async_operations(this, &ctx);
ctx.wait();
}

void ImageCtx::flush_async_operations(Context *on_finish) {
Mutex::Locker l(async_ops_lock);
if (async_ops.empty()) {
on_finish->complete(0);
return;
// complete context in clean thread context
_flush_async_operations(this, new C_AsyncCallback(this, on_finish));
}

int ImageCtx::flush() {
assert(owner_lock.is_locked());

flush_async_operations();
if (object_cacher != NULL) {
int r = flush_cache();
if (r < 0) {
return r;
}
}
return 0;
}

ldout(cct, 20) << "flush async operations: " << on_finish << " "
<< "count=" << async_ops.size() << dendl;
async_ops.front()->add_flush_context(on_finish);
void ImageCtx::flush(Context *on_safe) {
assert(owner_lock.is_locked());
if (object_cacher != NULL) {
// flush cache after completing all in-flight AIO ops
on_safe = new C_FlushCache(this, on_safe);
}
flush_async_operations(on_safe);
}

void ImageCtx::cancel_async_requests() {
Expand Down
6 changes: 4 additions & 2 deletions src/librbd/ImageCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,11 @@ namespace librbd {
void write_to_cache(object_t o, const bufferlist& bl, size_t len,
uint64_t off, Context *onfinish, int fadvise_flags);
void user_flushed();
void flush_cache_aio(Context *onfinish);
int flush_cache();
void flush_cache(Context *onfinish);
int shutdown_cache();
int invalidate_cache(bool purge_on_error=false);
void invalidate_cache(Context *on_finish);
void invalidate_cache_completion(int r, Context *on_finish);
void clear_nonexistence_cache();
int register_watch();
void unregister_watch();
Expand All @@ -233,6 +232,9 @@ namespace librbd {
void flush_async_operations();
void flush_async_operations(Context *on_finish);

int flush();
void flush(Context *on_safe);

void cancel_async_requests();
void apply_metadata_confs();
};
Expand Down
2 changes: 1 addition & 1 deletion src/librbd/ImageWatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ bool ImageWatcher::release_lock()
{
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
RWLock::WLocker md_locker(m_image_ctx.md_lock);
librbd::_flush(&m_image_ctx);
m_image_ctx.flush();
}

m_image_ctx.owner_lock.get_write();
Expand Down
Loading

0 comments on commit 5ff4eef

Please sign in to comment.