Skip to content

Commit

Permalink
Merge pull request #7906 from dillaman/wip-14869
Browse files Browse the repository at this point in the history
journal: re-use common threads between journalers

Conflicts:
	src/journal/JournalPlayer.cc
	src/librbd/Journal.cc
	src/test/rbd_mirror/image_replay.cc
	src/tools/rbd_mirror/ImageReplayer.h
	src/tools/rbd_mirror/Mirror.cc

(merged interface changes to ImageReplayer, and reduced scope for
change to JournalPlayer due to pr #7884 (wip-14663)).

Reviewed-by: Josh Durgin <jdurgin@redhat.com>
  • Loading branch information
jdurgin committed Mar 9, 2016
2 parents cfed9b6 + 7fd230f commit 3d404f2
Show file tree
Hide file tree
Showing 33 changed files with 575 additions and 227 deletions.
14 changes: 7 additions & 7 deletions src/journal/FutureImpl.cc
Expand Up @@ -2,15 +2,15 @@
// vim: ts=8 sw=2 smarttab

#include "journal/FutureImpl.h"
#include "common/Finisher.h"
#include "journal/JournalMetadata.h"
#include "journal/Utils.h"

namespace journal {

FutureImpl::FutureImpl(Finisher &finisher, uint64_t tag_tid, uint64_t entry_tid,
uint64_t commit_tid)
: RefCountedObject(NULL, 0), m_finisher(finisher), m_tag_tid(tag_tid),
m_entry_tid(entry_tid), m_commit_tid(commit_tid),
FutureImpl::FutureImpl(JournalMetadataPtr journal_metadata, uint64_t tag_tid,
uint64_t entry_tid, uint64_t commit_tid)
: RefCountedObject(NULL, 0), m_journal_metadata(journal_metadata),
m_tag_tid(tag_tid), m_entry_tid(entry_tid), m_commit_tid(commit_tid),
m_lock(utils::unique_lock_name("FutureImpl::m_lock", this)), m_safe(false),
m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
m_consistent_ack(this) {
Expand Down Expand Up @@ -51,7 +51,7 @@ void FutureImpl::flush(Context *on_safe) {
}

if (complete && on_safe != NULL) {
m_finisher.queue(on_safe, m_return_value);
m_journal_metadata->queue(on_safe, m_return_value);
} else if (flush_handler) {
// attached to journal object -- instruct it to flush all entries through
// this one. possible to become detached while lock is released, so flush
Expand All @@ -69,7 +69,7 @@ void FutureImpl::wait(Context *on_safe) {
return;
}
}
m_finisher.queue(on_safe, m_return_value);
m_journal_metadata->queue(on_safe, m_return_value);
}

bool FutureImpl::is_complete() const {
Expand Down
9 changes: 5 additions & 4 deletions src/journal/FutureImpl.h
Expand Up @@ -14,11 +14,11 @@
#include "include/assert.h"

class Context;
class Finisher;

namespace journal {

class FutureImpl;
class JournalMetadata;
typedef boost::intrusive_ptr<FutureImpl> FutureImplPtr;

class FutureImpl : public RefCountedObject, boost::noncopyable {
Expand All @@ -29,10 +29,11 @@ class FutureImpl : public RefCountedObject, boost::noncopyable {
virtual void get() = 0;
virtual void put() = 0;
};
typedef boost::intrusive_ptr<JournalMetadata> JournalMetadataPtr;
typedef boost::intrusive_ptr<FlushHandler> FlushHandlerPtr;

FutureImpl(Finisher &finisher, uint64_t tag_tid, uint64_t entry_tid,
uint64_t commit_tid);
FutureImpl(JournalMetadataPtr journal_metadata, uint64_t tag_tid,
uint64_t entry_tid, uint64_t commit_tid);

void init(const FutureImplPtr &prev_future);

Expand Down Expand Up @@ -95,7 +96,7 @@ class FutureImpl : public RefCountedObject, boost::noncopyable {
virtual void finish(int r) {}
};

Finisher &m_finisher;
JournalMetadataPtr m_journal_metadata;
uint64_t m_tag_tid;
uint64_t m_entry_tid;
uint64_t m_commit_tid;
Expand Down
52 changes: 16 additions & 36 deletions src/journal/JournalMetadata.cc
Expand Up @@ -4,7 +4,6 @@
#include "journal/JournalMetadata.h"
#include "journal/Utils.h"
#include "common/errno.h"
#include "common/Finisher.h"
#include "common/Timer.h"
#include "cls/journal/cls_journal_client.h"
#include <functional>
Expand Down Expand Up @@ -231,14 +230,15 @@ struct C_FlushCommitPosition : public Context {

} // anonymous namespace

JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
Mutex *timer_lock, librados::IoCtx &ioctx,
const std::string &oid,
const std::string &client_id,
double commit_interval)
: RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid),
m_client_id(client_id), m_commit_interval(commit_interval), m_order(0),
m_splay_width(0), m_pool_id(-1), m_initialized(false), m_finisher(NULL),
m_timer(NULL), m_timer_lock("JournalMetadata::m_timer_lock"),
m_splay_width(0), m_pool_id(-1), m_initialized(false),
m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this),
m_watch_handle(0), m_minimum_set(0), m_active_set(0),
m_update_notifications(0), m_commit_position_ctx(NULL),
Expand All @@ -249,20 +249,14 @@ JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,

JournalMetadata::~JournalMetadata() {
if (m_initialized) {
shutdown();
shut_down();
}
}

void JournalMetadata::init(Context *on_init) {
assert(!m_initialized);
m_initialized = true;

m_finisher = new Finisher(m_cct);
m_finisher->start();

m_timer = new SafeTimer(m_cct, m_timer_lock, true);
m_timer->init();

int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
if (r < 0) {
lderr(m_cct) << __func__ << ": failed to watch journal"
Expand All @@ -275,7 +269,7 @@ void JournalMetadata::init(Context *on_init) {
get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, ctx);
}

void JournalMetadata::shutdown() {
void JournalMetadata::shut_down() {

ldout(m_cct, 20) << __func__ << dendl;

Expand All @@ -292,24 +286,10 @@ void JournalMetadata::shutdown() {

flush_commit_position();

if (m_timer != NULL) {
Mutex::Locker locker(m_timer_lock);
m_timer->shutdown();
delete m_timer;
m_timer = NULL;
}

if (m_finisher != NULL) {
m_finisher->stop();
delete m_finisher;
m_finisher = NULL;
}

librados::Rados rados(m_ioctx);
rados.watch_flush();

m_async_op_tracker.wait_for_ops();
m_ioctx.aio_flush();
}

void JournalMetadata::get_immutable_metadata(uint8_t *order,
Expand Down Expand Up @@ -458,7 +438,7 @@ void JournalMetadata::set_active_set(uint64_t object_set) {
void JournalMetadata::flush_commit_position() {
ldout(m_cct, 20) << __func__ << dendl;

Mutex::Locker timer_locker(m_timer_lock);
Mutex::Locker timer_locker(*m_timer_lock);
Mutex::Locker locker(m_lock);
if (m_commit_position_ctx == nullptr) {
return;
Expand All @@ -471,12 +451,12 @@ void JournalMetadata::flush_commit_position() {
void JournalMetadata::flush_commit_position(Context *on_safe) {
ldout(m_cct, 20) << __func__ << dendl;

Mutex::Locker timer_locker(m_timer_lock);
Mutex::Locker timer_locker(*m_timer_lock);
Mutex::Locker locker(m_lock);
if (m_commit_position_ctx == nullptr) {
// nothing to flush
if (on_safe != nullptr) {
m_finisher->queue(on_safe, 0);
m_work_queue->queue(on_safe, 0);
}
return;
}
Expand Down Expand Up @@ -567,7 +547,7 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
void JournalMetadata::cancel_commit_task() {
ldout(m_cct, 20) << __func__ << dendl;

assert(m_timer_lock.is_locked());
assert(m_timer_lock->is_locked());
assert(m_lock.is_locked());
assert(m_commit_position_ctx != nullptr);
assert(m_commit_position_task_ctx != nullptr);
Expand All @@ -579,7 +559,7 @@ void JournalMetadata::cancel_commit_task() {
void JournalMetadata::schedule_commit_task() {
ldout(m_cct, 20) << __func__ << dendl;

assert(m_timer_lock.is_locked());
assert(m_timer_lock->is_locked());
assert(m_lock.is_locked());
assert(m_commit_position_ctx != nullptr);
if (m_commit_position_task_ctx == NULL) {
Expand All @@ -589,7 +569,7 @@ void JournalMetadata::schedule_commit_task() {
}

void JournalMetadata::handle_commit_position_task() {
assert(m_timer_lock.is_locked());
assert(m_timer_lock->is_locked());
assert(m_lock.is_locked());
ldout(m_cct, 20) << __func__ << ": "
<< "client_id=" << m_client_id << ", "
Expand All @@ -612,12 +592,12 @@ void JournalMetadata::handle_commit_position_task() {
}

void JournalMetadata::schedule_watch_reset() {
assert(m_timer_lock.is_locked());
assert(m_timer_lock->is_locked());
m_timer->add_event_after(0.1, new C_WatchReset(this));
}

void JournalMetadata::handle_watch_reset() {
assert(m_timer_lock.is_locked());
assert(m_timer_lock->is_locked());
if (!m_initialized) {
return;
}
Expand All @@ -644,7 +624,7 @@ void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {

void JournalMetadata::handle_watch_error(int err) {
lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
Mutex::Locker timer_locker(m_timer_lock);
Mutex::Locker timer_locker(*m_timer_lock);
Mutex::Locker locker(m_lock);

// release old watch on error
Expand Down Expand Up @@ -681,7 +661,7 @@ void JournalMetadata::committed(uint64_t commit_tid,
ObjectSetPosition commit_position;
Context *stale_ctx = nullptr;
{
Mutex::Locker timer_locker(m_timer_lock);
Mutex::Locker timer_locker(*m_timer_lock);
Mutex::Locker locker(m_lock);
assert(commit_tid > m_commit_position_tid);

Expand Down
17 changes: 9 additions & 8 deletions src/journal/JournalMetadata.h
Expand Up @@ -10,6 +10,7 @@
#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/RefCountedObj.h"
#include "common/WorkQueue.h"
#include "cls/journal/cls_journal_types.h"
#include "journal/AsyncOpTracker.h"
#include <boost/intrusive_ptr.hpp>
Expand All @@ -21,7 +22,6 @@
#include <string>
#include "include/assert.h"

class Finisher;
class SafeTimer;

namespace journal {
Expand All @@ -46,12 +46,13 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
virtual void handle_update(JournalMetadata *) = 0;
};

JournalMetadata(librados::IoCtx &ioctx, const std::string &oid,
JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
librados::IoCtx &ioctx, const std::string &oid,
const std::string &client_id, double commit_interval);
~JournalMetadata();

void init(Context *on_init);
void shutdown();
void shut_down();

void get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
int64_t *pool_id, Context *on_finish);
Expand Down Expand Up @@ -84,15 +85,15 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
return m_pool_id;
}

inline Finisher &get_finisher() {
return *m_finisher;
inline void queue(Context *on_finish, int r) {
m_work_queue->queue(on_finish, r);
}

inline SafeTimer &get_timer() {
return *m_timer;
}
inline Mutex &get_timer_lock() {
return m_timer_lock;
return *m_timer_lock;
}

void set_minimum_set(uint64_t object_set);
Expand Down Expand Up @@ -283,9 +284,9 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
int64_t m_pool_id;
bool m_initialized;

Finisher *m_finisher;
ContextWQ *m_work_queue;
SafeTimer *m_timer;
Mutex m_timer_lock;
Mutex *m_timer_lock;

mutable Mutex m_lock;

Expand Down
5 changes: 2 additions & 3 deletions src/journal/JournalPlayer.cc
Expand Up @@ -2,7 +2,6 @@
// vim: ts=8 sw=2 smarttab

#include "journal/JournalPlayer.h"
#include "common/Finisher.h"
#include "journal/Entry.h"
#include "journal/ReplayHandler.h"
#include "journal/Utils.h"
Expand Down Expand Up @@ -562,7 +561,7 @@ void JournalPlayer::notify_entries_available() {
m_handler_notified = true;

ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
m_journal_metadata->queue(new C_HandleEntriesAvailable(
m_replay_handler), 0);
}

Expand All @@ -571,7 +570,7 @@ void JournalPlayer::notify_complete(int r) {
m_handler_notified = true;

ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_journal_metadata->queue(new C_HandleComplete(
m_replay_handler), r);
}

Expand Down
7 changes: 3 additions & 4 deletions src/journal/JournalRecorder.cc
Expand Up @@ -2,7 +2,6 @@
// vim: ts=8 sw=2 smarttab

#include "journal/JournalRecorder.h"
#include "common/Finisher.h"
#include "journal/Entry.h"
#include "journal/Utils.h"

Expand Down Expand Up @@ -32,7 +31,7 @@ struct C_Flush : public Context {
}
if (pending_flushes.dec() == 0) {
// ensure all prior callback have been flushed as well
journal_metadata->get_finisher().queue(on_finish, ret_val);
journal_metadata->queue(on_finish, ret_val);
delete this;
}
}
Expand Down Expand Up @@ -81,8 +80,8 @@ Future JournalRecorder::append(uint64_t tag_tid,
ObjectRecorderPtr object_ptr = get_object(splay_offset);
uint64_t commit_tid = m_journal_metadata->allocate_commit_tid(
object_ptr->get_object_number(), tag_tid, entry_tid);
FutureImplPtr future(new FutureImpl(m_journal_metadata->get_finisher(),
tag_tid, entry_tid, commit_tid));
FutureImplPtr future(new FutureImpl(m_journal_metadata, tag_tid, entry_tid,
commit_tid));
future->init(m_prev_future);
m_prev_future = future;

Expand Down
1 change: 0 additions & 1 deletion src/journal/JournalTrimmer.cc
Expand Up @@ -5,7 +5,6 @@
#include "journal/Utils.h"
#include "common/Cond.h"
#include "common/errno.h"
#include "common/Finisher.h"
#include <limits>

#define dout_subsys ceph_subsys_journaler
Expand Down

0 comments on commit 3d404f2

Please sign in to comment.