Skip to content

Commit

Permalink
Merge branch 'wip-16223' into wip-mgolub-testing
Browse files Browse the repository at this point in the history
rbd-mirror: reduce memory footprint during journal replay #10341
  • Loading branch information
Mykola Golub committed Jul 22, 2016
2 parents aacba7f + 574be74 commit 08fe431
Show file tree
Hide file tree
Showing 51 changed files with 1,092 additions and 814 deletions.
1 change: 1 addition & 0 deletions qa/workunits/rbd/rbd_mirror_helpers.sh
Expand Up @@ -197,6 +197,7 @@ start_mirror()
--pid-file=$(daemon_pid_file "${cluster}") \
--log-file=${TEMPDIR}/rbd-mirror.${cluster}_daemon.\$cluster.\$pid.log \
--admin-socket=${TEMPDIR}/rbd-mirror.${cluster}_daemon.\$cluster.asok \
--rbd-mirror-journal-poll-age=1 \
--debug-rbd=30 --debug-journaler=30 \
--debug-rbd_mirror=30 \
--daemonize=true
Expand Down
4 changes: 4 additions & 0 deletions src/common/config_opts.h
Expand Up @@ -1248,10 +1248,14 @@ OPTION(rbd_journal_object_flush_interval, OPT_INT, 0) // maximum number of pendi
OPTION(rbd_journal_object_flush_bytes, OPT_INT, 0) // maximum number of pending bytes per journal object
OPTION(rbd_journal_object_flush_age, OPT_DOUBLE, 0) // maximum age (in seconds) for pending commits
OPTION(rbd_journal_pool, OPT_STR, "") // pool for journal objects
OPTION(rbd_journal_max_payload_bytes, OPT_U32, 16384) // maximum journal payload size before splitting

/**
* RBD Mirror options
*/
OPTION(rbd_mirror_journal_commit_age, OPT_DOUBLE, 5) // commit time interval, seconds
OPTION(rbd_mirror_journal_poll_age, OPT_DOUBLE, 5) // maximum age (in seconds) between successive journal polls
OPTION(rbd_mirror_journal_max_fetch_bytes, OPT_U32, 32768) // maximum bytes to read from each journal data object per fetch
OPTION(rbd_mirror_sync_point_update_age, OPT_DOUBLE, 30) // number of seconds between each update of the image sync point object number
OPTION(rbd_mirror_concurrent_image_syncs, OPT_U32, 5) // maximum number of image syncs in parallel

Expand Down
2 changes: 1 addition & 1 deletion src/journal/Entry.cc
Expand Up @@ -9,7 +9,7 @@

#define dout_subsys ceph_subsys_journaler
#undef dout_prefix
#define dout_prefix *_dout << "Entry: "
#define dout_prefix *_dout << "Entry: " << this << " "

namespace journal {

Expand Down
43 changes: 29 additions & 14 deletions src/journal/FutureImpl.cc
Expand Up @@ -10,7 +10,7 @@ FutureImpl::FutureImpl(uint64_t tag_tid, uint64_t entry_tid,
uint64_t commit_tid)
: RefCountedObject(NULL, 0), m_tag_tid(tag_tid), m_entry_tid(entry_tid),
m_commit_tid(commit_tid),
m_lock(utils::unique_lock_name("FutureImpl::m_lock", this)), m_safe(false),
m_lock("FutureImpl::m_lock", false, false), m_safe(false),
m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
m_consistent_ack(this) {
}
Expand All @@ -27,36 +27,51 @@ void FutureImpl::init(const FutureImplPtr &prev_future) {
}

void FutureImpl::flush(Context *on_safe) {

bool complete;
FlushHandlerPtr flush_handler;
FlushHandlers flush_handlers;
FutureImplPtr prev_future;
{
Mutex::Locker locker(m_lock);
complete = (m_safe && m_consistent);
if (!complete) {
if (on_safe != NULL) {
if (on_safe != nullptr) {
m_contexts.push_back(on_safe);
}

if (m_flush_state == FLUSH_STATE_NONE) {
m_flush_state = FLUSH_STATE_REQUESTED;
flush_handler = m_flush_handler;

// walk the chain backwards up to <splay width> futures
if (m_prev_future) {
m_prev_future->flush();
}
}
prev_future = prepare_flush(&flush_handlers);
}
}

// instruct prior futures to flush as well
while (prev_future) {
Mutex::Locker locker(prev_future->m_lock);
prev_future = prev_future->prepare_flush(&flush_handlers);
}

if (complete && on_safe != NULL) {
on_safe->complete(m_return_value);
} else if (flush_handler) {
} else if (!flush_handlers.empty()) {
// attached to journal object -- instruct it to flush all entries through
// this one. possible to become detached while lock is released, so flush
// will be re-requested by the object if it doesn't own the future
flush_handler->flush(this);
for (auto &pair : flush_handlers) {
pair.first->flush(pair.second);
}
}
}

FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers) {
assert(m_lock.is_locked());

if (m_flush_state == FLUSH_STATE_NONE) {
m_flush_state = FLUSH_STATE_REQUESTED;

if (m_flush_handler && flush_handlers->count(m_flush_handler) == 0) {
flush_handlers->insert({m_flush_handler, this});
}
}
return m_prev_future;
}

void FutureImpl::wait(Context *on_safe) {
Expand Down
4 changes: 4 additions & 0 deletions src/journal/FutureImpl.h
Expand Up @@ -9,6 +9,7 @@
#include "common/RefCountedObj.h"
#include "journal/Future.h"
#include <list>
#include <map>
#include <boost/noncopyable.hpp>
#include <boost/intrusive_ptr.hpp>
#include "include/assert.h"
Expand Down Expand Up @@ -76,6 +77,7 @@ class FutureImpl : public RefCountedObject, boost::noncopyable {
private:
friend std::ostream &operator<<(std::ostream &, const FutureImpl &);

typedef std::map<FlushHandlerPtr, FutureImplPtr> FlushHandlers;
typedef std::list<Context *> Contexts;

enum FlushState {
Expand Down Expand Up @@ -110,6 +112,8 @@ class FutureImpl : public RefCountedObject, boost::noncopyable {
C_ConsistentAck m_consistent_ack;
Contexts m_contexts;

FutureImplPtr prepare_flush(FlushHandlers *flush_handlers);

void consistent(int r);
void finish_unlock();
};
Expand Down
11 changes: 6 additions & 5 deletions src/journal/JournalMetadata.cc
Expand Up @@ -11,7 +11,7 @@

#define dout_subsys ceph_subsys_journaler
#undef dout_prefix
#define dout_prefix *_dout << "JournalMetadata: "
#define dout_prefix *_dout << "JournalMetadata: " << this << " "

namespace journal {

Expand Down Expand Up @@ -402,9 +402,9 @@ JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
Mutex *timer_lock, librados::IoCtx &ioctx,
const std::string &oid,
const std::string &client_id,
double commit_interval)
const Settings &settings)
: RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid),
m_client_id(client_id), m_commit_interval(commit_interval), m_order(0),
m_client_id(client_id), m_settings(settings), m_order(0),
m_splay_width(0), m_pool_id(-1), m_initialized(false),
m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this),
Expand Down Expand Up @@ -795,7 +795,8 @@ void JournalMetadata::schedule_commit_task() {
assert(m_commit_position_ctx != nullptr);
if (m_commit_position_task_ctx == NULL) {
m_commit_position_task_ctx = new C_CommitPositionTask(this);
m_timer->add_event_after(m_commit_interval, m_commit_position_task_ctx);
m_timer->add_event_after(m_settings.commit_interval,
m_commit_position_task_ctx);
}
}

Expand Down Expand Up @@ -1045,7 +1046,7 @@ std::ostream &operator<<(std::ostream &os,
<< "active_set=" << jm.m_active_set << ", "
<< "client_id=" << jm.m_client_id << ", "
<< "commit_tid=" << jm.m_commit_tid << ", "
<< "commit_interval=" << jm.m_commit_interval << ", "
<< "commit_interval=" << jm.m_settings.commit_interval << ", "
<< "commit_position=" << jm.m_commit_position << ", "
<< "registered_clients=" << jm.m_registered_clients << "]";
return os;
Expand Down
8 changes: 6 additions & 2 deletions src/journal/JournalMetadata.h
Expand Up @@ -14,6 +14,7 @@
#include "cls/journal/cls_journal_types.h"
#include "journal/AsyncOpTracker.h"
#include "journal/JournalMetadataListener.h"
#include "journal/Settings.h"
#include <boost/intrusive_ptr.hpp>
#include <boost/noncopyable.hpp>
#include <boost/optional.hpp>
Expand Down Expand Up @@ -44,7 +45,7 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {

JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
librados::IoCtx &ioctx, const std::string &oid,
const std::string &client_id, double commit_interval);
const std::string &client_id, const Settings &settings);
~JournalMetadata();

void init(Context *on_init);
Expand Down Expand Up @@ -73,6 +74,9 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
void get_tags(const boost::optional<uint64_t> &tag_class, Tags *tags,
Context *on_finish);

inline const Settings &get_settings() const {
return m_settings;
}
inline const std::string &get_client_id() const {
return m_client_id;
}
Expand Down Expand Up @@ -287,7 +291,7 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
CephContext *m_cct;
std::string m_oid;
std::string m_client_id;
double m_commit_interval;
Settings m_settings;

uint8_t m_order;
uint8_t m_splay_width;
Expand Down

0 comments on commit 08fe431

Please sign in to comment.