Skip to content

Commit

Permalink
journal: make librados call async in ObjectRecorder
Browse files Browse the repository at this point in the history
Signed-off-by: Ricardo Dias <rdias@suse.com>
  • Loading branch information
rjfd committed Sep 15, 2016
1 parent 1dc889c commit a595a3e
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 144 deletions.
4 changes: 4 additions & 0 deletions src/journal/JournalMetadata.h
Expand Up @@ -97,6 +97,10 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
m_work_queue->queue(on_finish, r);
}

inline ContextWQ *get_work_queue() {
return m_work_queue;
}

inline SafeTimer &get_timer() {
return *m_timer;
}
Expand Down
8 changes: 4 additions & 4 deletions src/journal/JournalRecorder.cc
Expand Up @@ -246,10 +246,10 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder(
uint64_t object_number, shared_ptr<Mutex> lock) {
ObjectRecorderPtr object_recorder(new ObjectRecorder(
m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
object_number, lock, m_journal_metadata->get_timer(),
m_journal_metadata->get_timer_lock(), &m_object_handler,
m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes,
m_flush_age));
object_number, lock, m_journal_metadata->get_work_queue(),
m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(),
&m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
m_flush_bytes, m_flush_age));
return object_recorder;
}

Expand Down
89 changes: 57 additions & 32 deletions src/journal/ObjectRecorder.cc
Expand Up @@ -19,17 +19,18 @@ namespace journal {

ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number, shared_ptr<Mutex> lock,
SafeTimer &timer, Mutex &timer_lock,
Handler *handler, uint8_t order,
uint32_t flush_interval, uint64_t flush_bytes,
double flush_age)
ContextWQ *work_queue, SafeTimer &timer,
Mutex &timer_lock, Handler *handler,
uint8_t order, uint32_t flush_interval,
uint64_t flush_bytes, double flush_age)
: RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock),
m_handler(handler), m_order(order), m_soft_max_size(1 << m_order),
m_flush_interval(flush_interval), m_flush_bytes(flush_bytes),
m_flush_age(flush_age), m_flush_handler(this), m_append_task(NULL),
m_lock(lock), m_append_tid(0), m_pending_bytes(0), m_size(0),
m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) {
m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer),
m_timer_lock(timer_lock), m_handler(handler), m_order(order),
m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
m_append_task(NULL), m_lock(lock), m_append_tid(0), m_pending_bytes(0),
m_size(0), m_overflowed(false), m_object_closed(false),
m_in_flight_flushes(false), m_aio_scheduled(false) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
assert(m_handler != NULL);
Expand All @@ -40,6 +41,7 @@ ObjectRecorder::~ObjectRecorder() {
assert(m_append_buffers.empty());
assert(m_in_flight_tids.empty());
assert(m_in_flight_appends.empty());
assert(!m_aio_scheduled);
}

bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) {
Expand Down Expand Up @@ -180,7 +182,7 @@ bool ObjectRecorder::close() {

assert(!m_object_closed);
m_object_closed = true;
return m_in_flight_tids.empty();
return m_in_flight_tids.empty() && !m_aio_scheduled;
}

void ObjectRecorder::handle_append_task() {
Expand Down Expand Up @@ -268,7 +270,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
}

// notify of overflow once all in-flight ops are complete
if (m_in_flight_tids.empty()) {
if (m_in_flight_tids.empty() && !m_aio_scheduled) {
notify_handler();
}
return;
Expand All @@ -279,7 +281,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
assert(!append_buffers.empty());

m_in_flight_appends.erase(iter);
if (m_in_flight_appends.empty() && m_object_closed) {
if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
// all remaining unsent appends should be redirected to new object
notify_handler();
}
Expand Down Expand Up @@ -331,32 +333,55 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
assert(m_lock->is_locked());
assert(!append_buffers->empty());

uint64_t append_tid = m_append_tid++;
ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
<< append_tid << dendl;
C_AppendFlush *append_flush = new C_AppendFlush(this, append_tid);

librados::ObjectWriteOperation op;
client::guard_append(&op, m_soft_max_size);

for (AppendBuffers::iterator it = append_buffers->begin();
it != append_buffers->end(); ++it) {
ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
<< dendl;
it->first->set_flush_in_progress();
op.append(it->second);
op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
m_size += it->second.length();
}
m_in_flight_tids.insert(append_tid);
m_in_flight_appends[append_tid].swap(*append_buffers);

librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(append_flush, NULL,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
assert(r == 0);
rados_completion->release();

m_pending_buffers.splice(m_pending_buffers.end(), *append_buffers,
append_buffers->begin(), append_buffers->end());
if (!m_aio_scheduled) {
m_op_work_queue->queue(new FunctionContext(
[this] (int r) {
Mutex::Locker locker(*m_lock);

m_aio_scheduled = false;

AppendBuffers append_buffers;
m_pending_buffers.swap(append_buffers);

uint64_t append_tid = m_append_tid++;
ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
<< append_tid << dendl;
C_AppendFlush *append_flush = new C_AppendFlush(this, append_tid);

librados::ObjectWriteOperation op;
client::guard_append(&op, m_soft_max_size);

for (AppendBuffers::iterator it = append_buffers.begin();
it != append_buffers.end(); ++it) {
ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
<< dendl;
op.append(it->second);
op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
}
m_in_flight_tids.insert(append_tid);
m_in_flight_appends[append_tid].swap(append_buffers);

librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(append_flush, NULL,
utils::rados_ctx_callback);
r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
assert(r == 0);
rados_completion->release();

}
));
m_aio_scheduled = true;
}
}

void ObjectRecorder::notify_handler() {
Expand Down
13 changes: 10 additions & 3 deletions src/journal/ObjectRecorder.h
Expand Up @@ -9,6 +9,7 @@
#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/RefCountedObj.h"
#include "common/WorkQueue.h"
#include "journal/FutureImpl.h"
#include <list>
#include <map>
Expand Down Expand Up @@ -38,9 +39,9 @@ class ObjectRecorder : public RefCountedObject, boost::noncopyable {

ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number, std::shared_ptr<Mutex> lock,
SafeTimer &timer, Mutex &timer_lock, Handler *handler,
uint8_t order, uint32_t flush_interval, uint64_t flush_bytes,
double flush_age);
ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock,
Handler *handler, uint8_t order, uint32_t flush_interval,
uint64_t flush_bytes, double flush_age);
~ObjectRecorder();

inline uint64_t get_object_number() const {
Expand Down Expand Up @@ -86,6 +87,7 @@ class ObjectRecorder : public RefCountedObject, boost::noncopyable {
object_recorder->put();
}
virtual void flush(const FutureImplPtr &future) {
Mutex::Locker locker(*(object_recorder->m_lock));
object_recorder->flush(future);
}
};
Expand Down Expand Up @@ -115,6 +117,8 @@ class ObjectRecorder : public RefCountedObject, boost::noncopyable {
uint64_t m_object_number;
CephContext *m_cct;

ContextWQ *m_op_work_queue;

SafeTimer &m_timer;
Mutex &m_timer_lock;

Expand Down Expand Up @@ -147,6 +151,9 @@ class ObjectRecorder : public RefCountedObject, boost::noncopyable {
bool m_in_flight_flushes;
Cond m_in_flight_flushes_cond;

AppendBuffers m_pending_buffers;
bool m_aio_scheduled;

void handle_append_task();
void cancel_append_task();
void schedule_append_task();
Expand Down
105 changes: 26 additions & 79 deletions src/librbd/Journal.cc
Expand Up @@ -31,33 +31,6 @@ namespace librbd {

namespace {

// helper for supporting lamba move captures
template <typename T, typename F>
struct CaptureImpl {
T t;
F f;

CaptureImpl(T &&t, F &&f) : t(std::forward<T>(t)), f(std::forward<F>(f)) {
}

template <typename ...Ts> auto operator()(Ts&&...args )
-> decltype(f(t, std::forward<Ts>(args)...)) {
return f(t, std::forward<Ts>(args)...);
}

template <typename ...Ts> auto operator()(Ts&&...args) const
-> decltype(f(t, std::forward<Ts>(args)...))
{
return f(t, std::forward<Ts>(args)...);
}
};

template <typename T, typename F>
CaptureImpl<T, F> make_capture(T &&t, F &&f) {
return CaptureImpl<T, F>(std::forward<T>(t), std::forward<F>(f) );
}


struct C_DecodeTag : public Context {
CephContext *cct;
Mutex *lock;
Expand Down Expand Up @@ -320,8 +293,8 @@ Journal<I>::Journal(I &image_ctx)
m_lock("Journal<I>::m_lock"), m_state(STATE_UNINITIALIZED),
m_error_result(0), m_replay_handler(this), m_close_pending(false),
m_event_lock("Journal<I>::m_event_lock"), m_event_tid(0),
m_pending_appends(0), m_blocking_writes(false),
m_journal_replay(NULL), m_metadata_listener(this) {
m_blocking_writes(false), m_journal_replay(NULL),
m_metadata_listener(this) {

CephContext *cct = m_image_ctx.cct;
ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
Expand Down Expand Up @@ -643,14 +616,6 @@ void Journal<I>::close(Context *on_finish) {
}

if (m_state == STATE_READY) {
if (m_pending_appends > 0) {
m_work_queue->queue(new FunctionContext(
[this, on_finish] (int r) {
close(on_finish);
}
));
return;
}
stop_recording();
}

Expand Down Expand Up @@ -879,49 +844,35 @@ uint64_t Journal<I>::append_io_events(journal::EventType event_type,

tid = ++m_event_tid;
assert(tid != 0);
}

m_pending_appends++;

FunctionContext *ctx = new FunctionContext(
make_capture(std::move(bufferlists), [this, requests, offset, length,
tid, event_type, flush_entry](const Bufferlists &bufferlists, int r) {
Futures futures;
for (auto &bl : bufferlists) {
assert(bl.length() <= m_max_append_size);
futures.push_back(m_journaler->append(m_tag_tid, bl));
}

{
Mutex::Locker event_locker(m_event_lock);
m_events[tid] = Event(futures, requests, offset, length);
m_events_cond.Signal();
}
Futures futures;
for (auto &bl : bufferlists) {
assert(bl.length() <= m_max_append_size);
futures.push_back(m_journaler->append(m_tag_tid, bl));
}

{
Mutex::Locker locker(m_lock);
m_pending_appends--;
}
{
Mutex::Locker event_locker(m_event_lock);
m_events[tid] = Event(futures, requests, offset, length);
}

CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": "
<< "event=" << event_type << ", "
<< "new_reqs=" << requests.size() << ", "
<< "offset=" << offset << ", "
<< "length=" << length << ", "
<< "flush=" << flush_entry << ", tid=" << tid << dendl;

Context *on_safe = create_async_context_callback(
m_image_ctx, new C_IOEventSafe(this, tid));
if (flush_entry) {
futures.back().flush(on_safe);
} else {
futures.back().wait(on_safe);
}
}
));
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": "
<< "event=" << event_type << ", "
<< "new_reqs=" << requests.size() << ", "
<< "offset=" << offset << ", "
<< "length=" << length << ", "
<< "flush=" << flush_entry << ", tid=" << tid << dendl;

m_work_queue->queue(ctx);
Context *on_safe = create_async_context_callback(
m_image_ctx, new C_IOEventSafe(this, tid));
if (flush_entry) {
futures.back().flush(on_safe);
} else {
futures.back().wait(on_safe);
}

return tid;
}

Expand Down Expand Up @@ -1087,10 +1038,6 @@ typename Journal<I>::Future Journal<I>::wait_event(Mutex &lock, uint64_t tid,
CephContext *cct = m_image_ctx.cct;

typename Events::iterator it = m_events.find(tid);
while(it == m_events.end()) {
m_events_cond.Wait(m_event_lock);
it = m_events.find(tid);
}
assert(it != m_events.end());

Event &event = it->second;
Expand Down
2 changes: 0 additions & 2 deletions src/librbd/Journal.h
Expand Up @@ -293,8 +293,6 @@ class Journal {
Mutex m_event_lock;
uint64_t m_event_tid;
Events m_events;
Cond m_events_cond;
uint32_t m_pending_appends;

atomic_t m_op_tid;
TidToFutures m_op_futures;
Expand Down

0 comments on commit a595a3e

Please sign in to comment.