diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 9c1a3c2f6761c..87decaf2951d0 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -914,6 +914,8 @@ OPTION(filestore_update_to, OPT_INT, 1000) OPTION(filestore_blackhole, OPT_BOOL, false) // drop any new transactions on the floor OPTION(filestore_fd_cache_size, OPT_INT, 128) // FD lru size OPTION(filestore_fd_cache_shards, OPT_INT, 16) // FD number of shards +OPTION(filestore_ondisk_finisher_threads, OPT_INT, 1) +OPTION(filestore_apply_finisher_threads, OPT_INT, 1) OPTION(filestore_dump_file, OPT_STR, "") // file onto which store transaction dumps OPTION(filestore_kill_at, OPT_INT, 0) // inject a failure at the n'th opportunity OPTION(filestore_inject_stall, OPT_INT, 0) // artificially stall for N seconds in op queue thread diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 5b8c2b5edf39b..291d539df45b1 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -522,7 +522,6 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, osflagbit basedir_fd(-1), current_fd(-1), backend(NULL), index_manager(do_update), - ondisk_finisher(g_ceph_context), lock("FileStore::lock"), force_sync(false), sync_entry_timeo_lock("sync_entry_timeo_lock"), @@ -530,9 +529,11 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, osflagbit stop(false), sync_thread(this), fdcache(g_ceph_context), wbthrottle(g_ceph_context), + next_osr_id(0), throttle_ops(g_ceph_context, "filestore_ops",g_conf->filestore_queue_max_ops), throttle_bytes(g_ceph_context, "filestore_bytes",g_conf->filestore_queue_max_bytes), - op_finisher(g_ceph_context), + m_ondisk_finisher_num(g_conf->filestore_ondisk_finisher_threads), + m_apply_finisher_num(g_conf->filestore_apply_finisher_threads), op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"), op_wq(this, g_conf->filestore_op_thread_timeout, g_conf->filestore_op_thread_suicide_timeout, &op_tp), @@ -567,6 +568,18 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, osflagbit m_filestore_max_inline_xattrs(0) { m_filestore_kill_at.set(g_conf->filestore_kill_at); + for (int i = 0; i < m_ondisk_finisher_num; ++i) { + ostringstream oss; + oss << "filestore-ondisk-" << i; + Finisher *f = new Finisher(g_ceph_context, oss.str()); + ondisk_finishers.push_back(f); + } + for (int i = 0; i < m_apply_finisher_num; ++i) { + ostringstream oss; + oss << "filestore-apply-" << i; + Finisher *f = new Finisher(g_ceph_context, oss.str()); + apply_finishers.push_back(f); + } ostringstream oss; oss << basedir << "/current"; @@ -617,6 +630,14 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, osflagbit FileStore::~FileStore() { + for (vector::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) { + delete *it; + *it = NULL; + } + for (vector::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) { + delete *it; + *it = NULL; + } g_ceph_context->_conf->remove_observer(this); g_ceph_context->get_perfcounters_collection()->remove(logger); @@ -1632,8 +1653,12 @@ int FileStore::mount() journal_start(); op_tp.start(); - op_finisher.start(); - ondisk_finisher.start(); + for (vector::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) { + (*it)->start(); + } + for (vector::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) { + (*it)->start(); + } timer.init(); @@ -1723,8 +1748,12 @@ int FileStore::umount() if (!(generic_flags & SKIP_JOURNAL_REPLAY)) journal_write_close(); - op_finisher.stop(); - ondisk_finisher.stop(); + for (vector::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) { + (*it)->stop(); + } + for (vector::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) { + (*it)->stop(); + } if (fsid_fd >= 0) { VOID_TEMP_FAILURE_RETRY(::close(fsid_fd)); @@ -1895,10 +1924,10 @@ void FileStore::_finish_op(OpSequencer *osr) o->onreadable_sync->complete(0); } if (o->onreadable) { - op_finisher.queue(o->onreadable); + apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable); } if (!to_queue.empty()) { - op_finisher.queue(to_queue); + apply_finishers[osr->id % m_apply_finisher_num]->queue(to_queue); } delete o; } @@ -1941,7 +1970,7 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, osr = static_cast(posr->p.get()); dout(5) << "queue_transactions existing " << osr << " " << *osr << dendl; } else { - osr = new OpSequencer; + osr = new OpSequencer(next_osr_id.inc()); osr->set_cct(g_ceph_context); osr->parent = posr; posr->p = osr; @@ -2032,7 +2061,7 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, if (onreadable_sync) { onreadable_sync->complete(r); } - op_finisher.queue(onreadable, r); + apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r); submit_manager.op_submit_finish(op); apply_manager.op_apply_finish(op); @@ -2054,10 +2083,10 @@ void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk) // getting blocked behind an ondisk completion. if (ondisk) { dout(10) << " queueing ondisk " << ondisk << dendl; - ondisk_finisher.queue(ondisk); + ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk); } if (!to_queue.empty()) { - ondisk_finisher.queue(to_queue); + ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(to_queue); } } @@ -3786,7 +3815,9 @@ void FileStore::_flush_op_queue() dout(10) << "_flush_op_queue draining op tp" << dendl; op_wq.drain(); dout(10) << "_flush_op_queue waiting for apply finisher" << dendl; - op_finisher.wait_for_empty(); + for (vector::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) { + (*it)->wait_for_empty(); + } } /* @@ -3810,7 +3841,9 @@ void FileStore::flush() if (journal) journal->flush(); dout(10) << "flush draining ondisk finisher" << dendl; - ondisk_finisher.wait_for_empty(); + for (vector::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) { + (*it)->wait_for_empty(); + } } _flush_op_queue(); diff --git a/src/os/FileStore.h b/src/os/FileStore.h index 2a04da2ac1aa2..de20354f6ac1b 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -152,8 +152,6 @@ class FileStore : public JournalingObjectStore, // ObjectMap boost::scoped_ptr object_map; - Finisher ondisk_finisher; - // helper fns int get_cdir(coll_t cid, char *s, int len); @@ -201,6 +199,7 @@ class FileStore : public JournalingObjectStore, public: Sequencer *parent; Mutex apply_lock; // for apply mutual exclusion + int id; /// get_max_uncompleted bool _get_max_uncompleted( @@ -315,10 +314,11 @@ class FileStore : public JournalingObjectStore, } } - OpSequencer() + OpSequencer(int i) : qlock("FileStore::OpSequencer::qlock", false, false), parent(0), - apply_lock("FileStore::OpSequencer::apply_lock", false, false) {} + apply_lock("FileStore::OpSequencer::apply_lock", false, false), + id(i) {} ~OpSequencer() { assert(q.empty()); } @@ -333,9 +333,13 @@ class FileStore : public JournalingObjectStore, FDCache fdcache; WBThrottle wbthrottle; + atomic_t next_osr_id; deque op_queue; Throttle throttle_ops, throttle_bytes; - Finisher op_finisher; + const int m_ondisk_finisher_num; + const int m_apply_finisher_num; + vector ondisk_finishers; + vector apply_finishers; ThreadPool op_tp; struct OpWQ : public ThreadPool::WorkQueue {