Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileStore: support multiple ondisk finish and apply finisher #6486

Merged
merged 1 commit into from Nov 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/config_opts.h
Expand Up @@ -914,6 +914,8 @@ OPTION(filestore_update_to, OPT_INT, 1000)
OPTION(filestore_blackhole, OPT_BOOL, false) // drop any new transactions on the floor
OPTION(filestore_fd_cache_size, OPT_INT, 128) // FD lru size
OPTION(filestore_fd_cache_shards, OPT_INT, 16) // FD number of shards
OPTION(filestore_ondisk_finisher_threads, OPT_INT, 1)
OPTION(filestore_apply_finisher_threads, OPT_INT, 1)
OPTION(filestore_dump_file, OPT_STR, "") // file onto which store transaction dumps
OPTION(filestore_kill_at, OPT_INT, 0) // inject a failure at the n'th opportunity
OPTION(filestore_inject_stall, OPT_INT, 0) // artificially stall for N seconds in op queue thread
Expand Down
61 changes: 47 additions & 14 deletions src/os/FileStore.cc
Expand Up @@ -522,17 +522,18 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, osflagbit
basedir_fd(-1), current_fd(-1),
backend(NULL),
index_manager(do_update),
ondisk_finisher(g_ceph_context),
lock("FileStore::lock"),
force_sync(false),
sync_entry_timeo_lock("sync_entry_timeo_lock"),
timer(g_ceph_context, sync_entry_timeo_lock),
stop(false), sync_thread(this),
fdcache(g_ceph_context),
wbthrottle(g_ceph_context),
next_osr_id(0),
throttle_ops(g_ceph_context, "filestore_ops",g_conf->filestore_queue_max_ops),
throttle_bytes(g_ceph_context, "filestore_bytes",g_conf->filestore_queue_max_bytes),
op_finisher(g_ceph_context),
m_ondisk_finisher_num(g_conf->filestore_ondisk_finisher_threads),
m_apply_finisher_num(g_conf->filestore_apply_finisher_threads),
op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"),
op_wq(this, g_conf->filestore_op_thread_timeout,
g_conf->filestore_op_thread_suicide_timeout, &op_tp),
Expand Down Expand Up @@ -567,6 +568,18 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, osflagbit
m_filestore_max_inline_xattrs(0)
{
m_filestore_kill_at.set(g_conf->filestore_kill_at);
for (int i = 0; i < m_ondisk_finisher_num; ++i) {
ostringstream oss;
oss << "filestore-ondisk-" << i;
Finisher *f = new Finisher(g_ceph_context, oss.str());
ondisk_finishers.push_back(f);
}
for (int i = 0; i < m_apply_finisher_num; ++i) {
ostringstream oss;
oss << "filestore-apply-" << i;
Finisher *f = new Finisher(g_ceph_context, oss.str());
apply_finishers.push_back(f);
}

ostringstream oss;
oss << basedir << "/current";
Expand Down Expand Up @@ -617,6 +630,14 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, osflagbit

FileStore::~FileStore()
{
for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
delete *it;
*it = NULL;
}
for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
delete *it;
*it = NULL;
}
g_ceph_context->_conf->remove_observer(this);
g_ceph_context->get_perfcounters_collection()->remove(logger);

Expand Down Expand Up @@ -1632,8 +1653,12 @@ int FileStore::mount()
journal_start();

op_tp.start();
op_finisher.start();
ondisk_finisher.start();
for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
(*it)->start();
}
for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
(*it)->start();
}

timer.init();

Expand Down Expand Up @@ -1723,8 +1748,12 @@ int FileStore::umount()
if (!(generic_flags & SKIP_JOURNAL_REPLAY))
journal_write_close();

op_finisher.stop();
ondisk_finisher.stop();
for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
(*it)->stop();
}
for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
(*it)->stop();
}

if (fsid_fd >= 0) {
VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
Expand Down Expand Up @@ -1895,10 +1924,10 @@ void FileStore::_finish_op(OpSequencer *osr)
o->onreadable_sync->complete(0);
}
if (o->onreadable) {
op_finisher.queue(o->onreadable);
apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
}
if (!to_queue.empty()) {
op_finisher.queue(to_queue);
apply_finishers[osr->id % m_apply_finisher_num]->queue(to_queue);
}
delete o;
}
Expand Down Expand Up @@ -1941,7 +1970,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
osr = static_cast<OpSequencer *>(posr->p.get());
dout(5) << "queue_transactions existing " << osr << " " << *osr << dendl;
} else {
osr = new OpSequencer;
osr = new OpSequencer(next_osr_id.inc());
osr->set_cct(g_ceph_context);
osr->parent = posr;
posr->p = osr;
Expand Down Expand Up @@ -2032,7 +2061,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
if (onreadable_sync) {
onreadable_sync->complete(r);
}
op_finisher.queue(onreadable, r);
apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r);

submit_manager.op_submit_finish(op);
apply_manager.op_apply_finish(op);
Expand All @@ -2054,10 +2083,10 @@ void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
// getting blocked behind an ondisk completion.
if (ondisk) {
dout(10) << " queueing ondisk " << ondisk << dendl;
ondisk_finisher.queue(ondisk);
ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
}
if (!to_queue.empty()) {
ondisk_finisher.queue(to_queue);
ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(to_queue);
}
}

Expand Down Expand Up @@ -3786,7 +3815,9 @@ void FileStore::_flush_op_queue()
dout(10) << "_flush_op_queue draining op tp" << dendl;
op_wq.drain();
dout(10) << "_flush_op_queue waiting for apply finisher" << dendl;
op_finisher.wait_for_empty();
for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
(*it)->wait_for_empty();
}
}

/*
Expand All @@ -3810,7 +3841,9 @@ void FileStore::flush()
if (journal)
journal->flush();
dout(10) << "flush draining ondisk finisher" << dendl;
ondisk_finisher.wait_for_empty();
for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
(*it)->wait_for_empty();
}
}

_flush_op_queue();
Expand Down
14 changes: 9 additions & 5 deletions src/os/FileStore.h
Expand Up @@ -152,8 +152,6 @@ class FileStore : public JournalingObjectStore,
// ObjectMap
boost::scoped_ptr<ObjectMap> object_map;

Finisher ondisk_finisher;

// helper fns
int get_cdir(coll_t cid, char *s, int len);

Expand Down Expand Up @@ -201,6 +199,7 @@ class FileStore : public JournalingObjectStore,
public:
Sequencer *parent;
Mutex apply_lock; // for apply mutual exclusion
int id;

/// get_max_uncompleted
bool _get_max_uncompleted(
Expand Down Expand Up @@ -315,10 +314,11 @@ class FileStore : public JournalingObjectStore,
}
}

OpSequencer()
OpSequencer(int i)
: qlock("FileStore::OpSequencer::qlock", false, false),
parent(0),
apply_lock("FileStore::OpSequencer::apply_lock", false, false) {}
apply_lock("FileStore::OpSequencer::apply_lock", false, false),
id(i) {}
~OpSequencer() {
assert(q.empty());
}
Expand All @@ -333,9 +333,13 @@ class FileStore : public JournalingObjectStore,
FDCache fdcache;
WBThrottle wbthrottle;

atomic_t next_osr_id;
deque<OpSequencer*> op_queue;
Throttle throttle_ops, throttle_bytes;
Finisher op_finisher;
const int m_ondisk_finisher_num;
const int m_apply_finisher_num;
vector<Finisher*> ondisk_finishers;
vector<Finisher*> apply_finishers;

ThreadPool op_tp;
struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {
Expand Down