Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

os/bluestore: multithreaded '_kv_sync_thread' #10856

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/config_opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ OPTION(bluestore_debug_prefill, OPT_FLOAT, 0)
OPTION(bluestore_debug_prefragment_max, OPT_INT, 1048576)
OPTION(bluestore_inject_wal_apply_delay, OPT_FLOAT, 0)
OPTION(bluestore_shard_finishers, OPT_BOOL, false)
OPTION(bluestore_kv_sync_threads, OPT_INT, 4)

OPTION(kstore_max_ops, OPT_U64, 512)
OPTION(kstore_max_bytes, OPT_U64, 64*1024*1024)
Expand Down
124 changes: 81 additions & 43 deletions src/os/bluestore/BlueStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1403,11 +1403,13 @@ BlueStore::BlueStore(CephContext *cct, const string& path)
cct->_conf->bluestore_wal_thread_suicide_timeout,
&wal_tp),
m_finisher_num(1),
kv_sync_thread(this),
kv_stop(false),
logger(NULL),
csum_type(bluestore_blob_t::CSUM_CRC32C),
sync_wal_apply(cct->_conf->bluestore_sync_wal_apply)
sync_wal_apply(cct->_conf->bluestore_sync_wal_apply),
kv_threads(cct->_conf->bluestore_kv_sync_threads),
osr_id(0),
kv_sync_thread(this, 0)
{
_init_logger();
g_ceph_context->_conf->add_observer(this);
Expand All @@ -1423,6 +1425,18 @@ BlueStore::BlueStore(CephContext *cct, const string& path)
Finisher *f = new Finisher(cct, oss.str(), "finisher");
finishers.push_back(f);
}

for (int i = 0; i < kv_threads; i++) {
kv_queues.push_back(new deque<TransContext*>);
kv_committings.push_back(new deque<TransContext*>);
wal_cleanup_queues.push_back(new deque<TransContext*>);
wal_cleanings.push_back(new deque<TransContext*>);

kv_sync_threads.push_back(new KVSyncThread(this, i));

kv_locks.push_back(new std::mutex);
kv_conds.push_back(new std::condition_variable);
}
}

BlueStore::~BlueStore()
Expand Down Expand Up @@ -2846,8 +2860,10 @@ int BlueStore::mount()
f->start();
}
wal_tp.start();
kv_sync_thread.create("bstore_kv_sync");

//kv_sync_thread.create("bstore_kv_sync");
for (int i = 0; i < kv_threads; i++) {
kv_sync_threads[i]->create("kv_sync");
}
r = _wal_replay();
if (r < 0)
goto out_stop;
Expand Down Expand Up @@ -3400,13 +3416,14 @@ void BlueStore::_sync()
// flush aios in flight
bdev->flush();

std::unique_lock<std::mutex> l(kv_lock);
while (!kv_committing.empty() ||
!kv_queue.empty()) {
dout(20) << " waiting for kv to commit" << dendl;
kv_sync_cond.wait(l);
for (int i = 0; i < kv_threads; i++) {
std::unique_lock<std::mutex> l(*kv_locks[i]);
while(!kv_committings[i]->empty() ||
!kv_queues[i]->empty()) {
dout(20) << " waiting for kv to commit" << dendl;
kv_sync_cond.wait(l);
}
}

dout(10) << __func__ << " done" << dendl;
}

Expand Down Expand Up @@ -4835,9 +4852,10 @@ void BlueStore::_txc_state_proc(TransContext *txc)
assert(r == 0);
}
{
std::lock_guard<std::mutex> l(kv_lock);
kv_queue.push_back(txc);
kv_cond.notify_one();
std::lock_guard<std::mutex> l(*kv_locks[txc->osr->id % kv_threads]);
//kv_queue.push_back(txc);
kv_queues[txc->osr->id % kv_threads]->push_back(txc);
kv_conds[txc->osr->id % kv_threads]->notify_one();
}
return;
case TransContext::STATE_KV_QUEUED:
Expand Down Expand Up @@ -5124,40 +5142,37 @@ void BlueStore::_txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t)
_txc_update_store_statfs(txc);
}

void BlueStore::_kv_sync_thread()
void BlueStore::_kv_sync_thread(int id)
{
dout(10) << __func__ << " start" << dendl;
std::unique_lock<std::mutex> l(kv_lock);
dout(10) << __func__ << " start" << " id " << id << dendl;
std::unique_lock<std::mutex> l(*kv_locks[id]);
while (true) {
assert(kv_committing.empty());
assert(wal_cleaning.empty());
if (kv_queue.empty() && wal_cleanup_queue.empty()) {
assert(kv_committings[id]->empty());
assert(wal_cleanings[id]->empty());
if (kv_queues[id]->empty() && wal_cleanup_queues[id]->empty()) {
if (kv_stop)
break;
dout(20) << __func__ << " sleep" << dendl;
kv_sync_cond.notify_all();
kv_cond.wait(l);
kv_conds[id]->wait(l);
dout(20) << __func__ << " wake" << dendl;
} else {
dout(20) << __func__ << " committing " << kv_queue.size()
<< " cleaning " << wal_cleanup_queue.size() << dendl;
kv_committing.swap(kv_queue);
wal_cleaning.swap(wal_cleanup_queue);
dout(20) << __func__ << " committing " << kv_queues[id]->size()
<< " cleaning " << wal_cleanup_queues[id]->size() << dendl;
kv_committings[id]->swap(*(kv_queues[id]));
wal_cleanings[id]->swap(*(wal_cleanup_queues[id]));
utime_t start = ceph_clock_now(NULL);
l.unlock();

dout(30) << __func__ << " committing txc " << kv_committing << dendl;
dout(30) << __func__ << " wal_cleaning txc " << wal_cleaning << dendl;

alloc->commit_start();

// flush/barrier on block device
bdev->flush();

if (!g_conf->bluestore_sync_transaction &&
!g_conf->bluestore_sync_submit_transaction) {
for (std::deque<TransContext *>::iterator it = kv_committing.begin();
it != kv_committing.end();
for (std::deque<TransContext *>::iterator it = kv_committings[id]->begin();
it != kv_committings[id]->end();
++it) {
_txc_finalize_kv((*it), (*it)->t);
int r = db->submit_transaction((*it)->t);
Expand All @@ -5184,9 +5199,31 @@ void BlueStore::_kv_sync_thread()
}
}

// allocations and deallocations
for (std::deque<TransContext *>::iterator it = wal_cleanings[id]->begin();
it != wal_cleanings[id]->end();
++it) {
TransContext *txc = *it;
if (!txc->wal_txn->released.empty()) {
dout(20) << __func__ << " txc " << txc
<< " (post-wal) released " << txc->wal_txn->released
<< dendl;
for (interval_set<uint64_t>::iterator p =
txc->wal_txn->released.begin();
p != txc->wal_txn->released.end();
++p) {
dout(20) << __func__ << " release " << p.get_start()
<< "~" << p.get_len() << dendl;
fm->release(p.get_start(), p.get_len(), t);
if (!g_conf->bluestore_debug_no_reuse_blocks)
alloc->release(p.get_start(), p.get_len());
}
}
}

// cleanup sync wal keys
for (std::deque<TransContext *>::iterator it = wal_cleaning.begin();
it != wal_cleaning.end();
for (std::deque<TransContext *>::iterator it = wal_cleanings[id]->begin();
it != wal_cleanings[id]->end();
++it) {
bluestore_wal_transaction_t& wt =*(*it)->wal_txn;
// kv metadata updates
Expand All @@ -5201,18 +5238,16 @@ void BlueStore::_kv_sync_thread()

utime_t finish = ceph_clock_now(NULL);
utime_t dur = finish - start;
dout(20) << __func__ << " committed " << kv_committing.size()
<< " cleaned " << wal_cleaning.size()
<< " in " << dur << dendl;
while (!kv_committing.empty()) {
TransContext *txc = kv_committing.front();

while (!kv_committings[id]->empty()) {
TransContext *txc = kv_committings[id]->front();
_txc_state_proc(txc);
kv_committing.pop_front();
kv_committings[id]->pop_front();
}
while (!wal_cleaning.empty()) {
TransContext *txc = wal_cleaning.front();
while (!wal_cleanings[id]->empty()) {
TransContext *txc = wal_cleanings[id]->front();
_txc_state_proc(txc);
wal_cleaning.pop_front();
wal_cleanings[id]->pop_front();
}

alloc->commit_finish();
Expand Down Expand Up @@ -5282,11 +5317,12 @@ int BlueStore::_wal_finish(TransContext *txc)
assert(txc->wal_txn->released.empty());

std::lock_guard<std::mutex> l2(txc->osr->qlock);
std::lock_guard<std::mutex> l(kv_lock);
std::lock_guard<std::mutex> l(*kv_locks[txc->osr->id % kv_threads]);
txc->state = TransContext::STATE_WAL_CLEANUP;
txc->osr->qcond.notify_all();
wal_cleanup_queue.push_back(txc);
kv_cond.notify_one();
//wal_cleanup_queue.push_back(txc);
wal_cleanup_queues[txc->osr->id % kv_threads]->push_back(txc);
kv_conds[txc->osr->id % kv_threads]->notify_one();
return 0;
}

Expand Down Expand Up @@ -5319,6 +5355,7 @@ int BlueStore::_wal_replay()
{
dout(10) << __func__ << " start" << dendl;
OpSequencerRef osr = new OpSequencer;
osr->id = 0;
int count = 0;
KeyValueDB::Iterator it = db->get_iterator(PREFIX_WAL);
for (it->lower_bound(string()); it->valid(); it->next(), ++count) {
Expand Down Expand Up @@ -5371,6 +5408,7 @@ int BlueStore::queue_transactions(
osr = new OpSequencer;
osr->parent = posr;
posr->p = osr;
osr->id = osr_id.inc();
dout(10) << __func__ << " new " << osr << " " << *osr << dendl;
}

Expand Down
28 changes: 20 additions & 8 deletions src/os/bluestore/BlueStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ class BlueStore : public ObjectStore,
std::mutex wal_apply_mutex;

uint64_t last_seq = 0;

int id = 0;
OpSequencer()
//set the qlock to PTHREAD_MUTEX_RECURSIVE mode
: parent(NULL) {
Expand Down Expand Up @@ -1098,9 +1098,10 @@ class BlueStore : public ObjectStore,

struct KVSyncThread : public Thread {
BlueStore *store;
explicit KVSyncThread(BlueStore *s) : store(s) {}
int id;
explicit KVSyncThread(BlueStore *s, int id) : store(s), id(id) {}
void *entry() {
store->_kv_sync_thread();
store->_kv_sync_thread(id);
return NULL;
}
};
Expand Down Expand Up @@ -1144,11 +1145,16 @@ class BlueStore : public ObjectStore,
vector<Finisher*> finishers;

KVSyncThread kv_sync_thread;
vector<KVSyncThread *> kv_sync_threads;
std::mutex kv_lock;
std::condition_variable kv_cond, kv_sync_cond;
vector<std::mutex *> kv_locks;
vector<std::condition_variable *> kv_conds;
bool kv_stop;
deque<TransContext*> kv_queue, kv_committing;
deque<TransContext*> wal_cleanup_queue, wal_cleaning;
vector<deque<TransContext*> *> kv_queues, kv_committings;
vector<deque<TransContext*> *> wal_cleanup_queues, wal_cleanings;

PerfCounters *logger;

Expand Down Expand Up @@ -1189,6 +1195,9 @@ class BlueStore : public ObjectStore,
uint64_t comp_min_blob_size = 0;
uint64_t comp_max_blob_size = 0;

atomic_t osr_id;
int kv_threads;

// --------------------------------------------------------
// private methods

Expand Down Expand Up @@ -1258,14 +1267,17 @@ class BlueStore : public ObjectStore,

void _osr_reap_done(OpSequencer *osr);

void _kv_sync_thread();
void _kv_sync_thread(int id);
void _kv_stop() {
{
std::lock_guard<std::mutex> l(kv_lock);
for (int i = 0; i < kv_threads; i++) {
std::lock_guard<std::mutex> l(*kv_locks[i]);
kv_stop = true;
kv_cond.notify_all();
kv_conds[i]->notify_all();
}
//kv_sync_thread.join();
for (int i = 0; i < kv_threads; i++) {
kv_sync_threads[i]->join();
}
kv_sync_thread.join();
kv_stop = false;
}

Expand Down
8 changes: 5 additions & 3 deletions src/os/bluestore/StupidAllocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,11 @@ void StupidAllocator::commit_start()
std::lock_guard<std::mutex> l(lock);
dout(10) << __func__ << " releasing " << num_uncommitted
<< " in extents " << uncommitted.num_intervals() << dendl;
assert(committing.empty());
committing.swap(uncommitted);
num_committing = num_uncommitted;
for (auto p = uncommitted.begin(); p != uncommitted.end(); ++p) {
committing.insert(p.get_start(), p.get_len());
}
uncommitted.clear();
num_committing += num_uncommitted;
num_uncommitted = 0;
}

Expand Down