Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

os/bluestore: add multiple finishers to bluestore #10567

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/config_opts.h
Expand Up @@ -1009,6 +1009,7 @@ OPTION(bluestore_debug_freelist, OPT_BOOL, false)
OPTION(bluestore_debug_prefill, OPT_FLOAT, 0)
OPTION(bluestore_debug_prefragment_max, OPT_INT, 1048576)
OPTION(bluestore_inject_wal_apply_delay, OPT_FLOAT, 0)
OPTION(bluestore_use_multiple_finishers, OPT_BOOL, false)

OPTION(kstore_max_ops, OPT_U64, 512)
OPTION(kstore_max_bytes, OPT_U64, 64*1024*1024)
Expand Down
3 changes: 2 additions & 1 deletion src/os/ObjectStore.h
Expand Up @@ -179,10 +179,11 @@ class ObjectStore {
*/
struct Sequencer {
string name;
spg_t shard_hint;
Sequencer_implRef p;

explicit Sequencer(string n)
: name(n), p(NULL) {}
: name(n), shard_hint(spg_t()), p(NULL) {}
~Sequencer() {
}

Expand Down
46 changes: 35 additions & 11 deletions src/os/bluestore/BlueStore.cc
Expand Up @@ -1380,7 +1380,7 @@ BlueStore::BlueStore(CephContext *cct, const string& path)
cct->_conf->bluestore_wal_thread_timeout,
cct->_conf->bluestore_wal_thread_suicide_timeout,
&wal_tp),
finisher(cct),
m_finisher_num(1),
kv_sync_thread(this),
kv_stop(false),
logger(NULL),
Expand All @@ -1390,10 +1390,26 @@ BlueStore::BlueStore(CephContext *cct, const string& path)
_init_logger();
g_ceph_context->_conf->add_observer(this);
set_cache_shards(1);

if (cct->_conf->bluestore_use_multiple_finishers) {
m_finisher_num = cct->_conf->osd_op_num_shards;
}

for (int i = 0; i < m_finisher_num; ++i) {
ostringstream oss;
oss << "finisher-" << i;
Finisher *f = new Finisher(cct, oss.str(), "finisher");
finishers.push_back(f);
}
}

BlueStore::~BlueStore()
{
for (auto f : finishers) {
delete f;
f = NULL;
}

g_ceph_context->_conf->remove_observer(this);
_shutdown_logger();
assert(!mounted);
Expand Down Expand Up @@ -2762,7 +2778,9 @@ int BlueStore::mount()
goto out_coll;
}

finisher.start();
for (auto f : finishers) {
f->start();
}
wal_tp.start();
kv_sync_thread.create("bstore_kv_sync");

Expand All @@ -2780,8 +2798,10 @@ int BlueStore::mount()
_kv_stop();
wal_wq.drain();
wal_tp.stop();
finisher.wait_for_empty();
finisher.stop();
for (auto f : finishers) {
f->wait_for_empty();
f->stop();
}
out_coll:
coll_map.clear();
out_alloc:
Expand Down Expand Up @@ -2814,10 +2834,12 @@ int BlueStore::umount()
wal_wq.drain();
dout(20) << __func__ << " stopping wal_tp" << dendl;
wal_tp.stop();
dout(20) << __func__ << " draining finisher" << dendl;
finisher.wait_for_empty();
dout(20) << __func__ << " stopping finisher" << dendl;
finisher.stop();
for (auto f : finishers) {
dout(20) << __func__ << " draining finisher" << dendl;
f->wait_for_empty();
dout(20) << __func__ << " stopping finisher" << dendl;
f->stop();
}
dout(20) << __func__ << " closing" << dendl;

mounted = false;
Expand Down Expand Up @@ -4818,16 +4840,18 @@ void BlueStore::_txc_finish_kv(TransContext *txc)
txc->onreadable_sync->complete(0);
txc->onreadable_sync = NULL;
}
unsigned n = txc->osr->parent->shard_hint.hash_to_shard(m_finisher_num);
if (txc->onreadable) {
finisher.queue(txc->onreadable);
finishers[n]->queue(txc->onreadable);
txc->onreadable = NULL;
}
if (txc->oncommit) {
finisher.queue(txc->oncommit);
finishers[n]->queue(txc->oncommit);
txc->oncommit = NULL;
}
while (!txc->oncommits.empty()) {
finisher.queue(txc->oncommits.front());
auto f = txc->oncommits.front();
finishers[n]->queue(f);
txc->oncommits.pop_front();
}

Expand Down
3 changes: 2 additions & 1 deletion src/os/bluestore/BlueStore.h
Expand Up @@ -1139,7 +1139,8 @@ class BlueStore : public ObjectStore,
ThreadPool wal_tp;
WALWQ wal_wq;

Finisher finisher;
int m_finisher_num;
vector<Finisher*> finishers;

KVSyncThread kv_sync_thread;
std::mutex kv_lock;
Expand Down
1 change: 1 addition & 0 deletions src/osd/PG.cc
Expand Up @@ -249,6 +249,7 @@ PG::PG(OSDService *o, OSDMapRef curmap,
#ifdef PG_DEBUG_REFS
osd->add_pgid(p, this);
#endif
osr->shard_hint = p;
}

PG::~PG()
Expand Down