Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

os/bluestore: refactor bluestore_sync_submit_transaction #11537

Merged
merged 7 commits into from Nov 1, 2016
4 changes: 2 additions & 2 deletions src/common/config_opts.h
Expand Up @@ -1011,8 +1011,7 @@ OPTION(bluestore_rocksdb_options, OPT_STR, "compression=kNoCompression,max_write
OPTION(bluestore_fsck_on_mount, OPT_BOOL, false)
OPTION(bluestore_fsck_on_umount, OPT_BOOL, false)
OPTION(bluestore_fsck_on_mkfs, OPT_BOOL, true)
OPTION(bluestore_sync_transaction, OPT_BOOL, false) // perform kv txn synchronously
OPTION(bluestore_sync_submit_transaction, OPT_BOOL, false)
OPTION(bluestore_sync_submit_transaction, OPT_BOOL, true) // submit kv txn in queueing thread (not kv_sync_thread)
OPTION(bluestore_sync_wal_apply, OPT_BOOL, true) // perform initial wal work synchronously (possibly in combination with aio so we only *queue* ios)
OPTION(bluestore_wal_threads, OPT_INT, 4)
OPTION(bluestore_wal_thread_timeout, OPT_INT, 30)
Expand All @@ -1033,6 +1032,7 @@ OPTION(bluestore_debug_freelist, OPT_BOOL, false)
OPTION(bluestore_debug_prefill, OPT_FLOAT, 0)
OPTION(bluestore_debug_prefragment_max, OPT_INT, 1048576)
OPTION(bluestore_debug_inject_read_err, OPT_BOOL, false)
OPTION(bluestore_debug_randomize_serial_transaction, OPT_INT, 0)
OPTION(bluestore_inject_wal_apply_delay, OPT_FLOAT, 0)
OPTION(bluestore_shard_finishers, OPT_BOOL, false)

Expand Down
4 changes: 4 additions & 0 deletions src/os/bluestore/BitmapFreelistManager.h
Expand Up @@ -67,6 +67,10 @@ class BitmapFreelistManager : public FreelistManager {
void release(
uint64_t offset, uint64_t length,
KeyValueDB::Transaction txn) override;

bool supports_parallel_transactions() override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we inline this?

return true;
}
};

#endif
141 changes: 88 additions & 53 deletions src/os/bluestore/BlueStore.cc
Expand Up @@ -6020,11 +6020,13 @@ int BlueStore::_open_super_meta()
db->get(PREFIX_SUPER, "nid_max", &bl);
bufferlist::iterator p = bl.begin();
try {
::decode(nid_max, p);
uint64_t v;
::decode(v, p);
nid_max = v;
} catch (buffer::error& e) {
}
dout(10) << __func__ << " old nid_max " << nid_max << dendl;
nid_last = nid_max;
nid_last = nid_max.load();
}

// blobid
Expand All @@ -6034,11 +6036,13 @@ int BlueStore::_open_super_meta()
db->get(PREFIX_SUPER, "blobid_max", &bl);
bufferlist::iterator p = bl.begin();
try {
::decode(blobid_max, p);
uint64_t v;
::decode(v, p);
blobid_max = v;
} catch (buffer::error& e) {
}
dout(10) << __func__ << " old blobid_max " << blobid_max << dendl;
blobid_last = blobid_max;
blobid_last = blobid_max.load();
}

// freelist
Expand Down Expand Up @@ -6162,21 +6166,40 @@ void BlueStore::_txc_state_proc(TransContext *txc)
sb->bc.finish_write(txc->seq);
}
txc->shared_blobs_written.clear();
if (!g_conf->bluestore_sync_transaction) {
if (g_conf->bluestore_sync_submit_transaction) {
if (g_conf->bluestore_sync_submit_transaction &&
fm->supports_parallel_transactions()) {
if (txc->last_nid >= nid_max ||
txc->last_blobid >= blobid_max) {
dout(20) << __func__
<< " last_{nid,blobid} exceeds max, submit via kv thread"
<< dendl;
} else if (txc->osr->kv_committing_serially) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liewegas I think this may hurt the parallelism, may be we should guard against a number rather than even 1 ?
I will run through my test and see how frequent we are hitting this.

dout(20) << __func__ << " prior txc submitted via kv thread, us too"
<< dendl;
// note: this is starvation-prone. once we have a txc in a busy
// sequencer that is committing serially it is possible to keep
// submitting new transactions fast enough that we get stuck doing
// so. the alternative is to block here... fixme?
} else if (g_conf->bluestore_debug_randomize_serial_transaction &&
rand() % g_conf->bluestore_debug_randomize_serial_transaction
== 0) {
dout(20) << __func__ << " DEBUG randomly forcing submit via kv thread"
<< dendl;
} else {
_txc_finalize_kv(txc, txc->t);
txc->kv_submitted = true;
int r = db->submit_transaction(txc->t);
assert(r == 0);
}
} else {
_txc_finalize_kv(txc, txc->t);
int r = db->submit_transaction_sync(txc->t);
assert(r == 0);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liewegas .
I think we in kv_sync_thread, we must make sure the transaction which include new nid_max must update and then update nid_max.
Like:
nid_max_tmp =
encode(nid_max_tmp, bl)
submit_transaction(t)

nid_max = nid_max_tmp;
Avoid later txc first submit and the txc include new nid_max later submit.

{
std::lock_guard<std::mutex> l(kv_lock);
kv_queue.push_back(txc);
kv_cond.notify_one();
if (!txc->kv_submitted) {
kv_queue_unsubmitted.push_back(txc);
++txc->osr->kv_committing_serially;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liewegas What is the need of this atomic ? I can see it is just incremented and decremented , not checked anywhere.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liewegas Sorry missed the check , gave my comment above.

}
}
return;
case TransContext::STATE_KV_QUEUED:
Expand Down Expand Up @@ -6504,7 +6527,6 @@ void BlueStore::_kv_sync_thread()
std::unique_lock<std::mutex> l(kv_lock);
while (true) {
assert(kv_committing.empty());
assert(wal_cleaning.empty());
if (kv_queue.empty() && wal_cleanup_queue.empty()) {
if (kv_stop)
break;
Expand All @@ -6513,60 +6535,61 @@ void BlueStore::_kv_sync_thread()
kv_cond.wait(l);
dout(20) << __func__ << " wake" << dendl;
} else {
deque<TransContext*> kv_submitting;
deque<TransContext*> wal_cleaning;
dout(20) << __func__ << " committing " << kv_queue.size()
<< " submitting " << kv_queue_unsubmitted.size()
<< " cleaning " << wal_cleanup_queue.size() << dendl;
kv_committing.swap(kv_queue);
kv_submitting.swap(kv_queue_unsubmitted);
wal_cleaning.swap(wal_cleanup_queue);
utime_t start = ceph_clock_now(NULL);
l.unlock();

dout(30) << __func__ << " committing txc " << kv_committing << dendl;
dout(30) << __func__ << " submitting txc " << kv_submitting << dendl;
dout(30) << __func__ << " wal_cleaning txc " << wal_cleaning << dendl;

alloc->commit_start();

// flush/barrier on block device
bdev->flush();

uint64_t high_nid = 0, high_blobid = 0;
if (!g_conf->bluestore_sync_transaction &&
!g_conf->bluestore_sync_submit_transaction) {
for (auto txc : kv_committing) {
_txc_finalize_kv(txc, txc->t);
if (txc->last_nid > high_nid) {
high_nid = txc->last_nid;
}
if (txc->last_blobid > high_blobid) {
high_blobid = txc->last_blobid;
}
txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
}
if (!kv_committing.empty()) {
TransContext *first_txc = kv_committing.front();
std::lock_guard<std::mutex> l(id_lock);
if (high_nid + g_conf->bluestore_nid_prealloc/2 > nid_max) {
nid_max = high_nid + g_conf->bluestore_nid_prealloc;
bufferlist bl;
::encode(nid_max, bl);
first_txc->t->set(PREFIX_SUPER, "nid_max", bl);
dout(10) << __func__ << " nid_max now " << nid_max << dendl;
}
if (high_blobid + g_conf->bluestore_blobid_prealloc/2 > blobid_max) {
blobid_max = high_blobid + g_conf->bluestore_blobid_prealloc;
bufferlist bl;
::encode(blobid_max, bl);
first_txc->t->set(PREFIX_SUPER, "blobid_max", bl);
dout(10) << __func__ << " blobid_max now " << blobid_max << dendl;
}
}
for (auto txc : kv_committing) {
int r = db->submit_transaction(txc->t);
assert(r == 0);
}
// we will use one final transaction to force a sync
KeyValueDB::Transaction synct = db->get_transaction();

// increase {nid,blobid}_max? note that this covers both the
// case wehre we are approaching the max and the case we passed
// it. in either case, we increase the max in the earlier txn
// we submit.
uint64_t new_nid_max = 0, new_blobid_max = 0;
if (nid_last + g_conf->bluestore_nid_prealloc/2 > nid_max) {
KeyValueDB::Transaction t =
kv_submitting.empty() ? synct : kv_submitting.front()->t;
new_nid_max = nid_last + g_conf->bluestore_nid_prealloc;
bufferlist bl;
::encode(new_nid_max, bl);
t->set(PREFIX_SUPER, "nid_max", bl);
dout(10) << __func__ << " new_nid_max " << new_nid_max << dendl;
}
if (blobid_last + g_conf->bluestore_blobid_prealloc/2 > blobid_max) {
KeyValueDB::Transaction t =
kv_submitting.empty() ? synct : kv_submitting.front()->t;
new_blobid_max = blobid_last + g_conf->bluestore_blobid_prealloc;
bufferlist bl;
::encode(new_blobid_max, bl);
t->set(PREFIX_SUPER, "blobid_max", bl);
dout(10) << __func__ << " new_blobid_max " << new_blobid_max << dendl;
}
for (auto txc : kv_submitting) {
assert(!txc->kv_submitted);
_txc_finalize_kv(txc, txc->t);
txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
int r = db->submit_transaction(txc->t);
assert(r == 0);
--txc->osr->kv_committing_serially;
txc->kv_submitted = true;
}

// one final transaction to force a sync
KeyValueDB::Transaction t = db->get_transaction();

vector<bluestore_pextent_t> bluefs_gift_extents;
if (bluefs) {
Expand All @@ -6580,7 +6603,7 @@ void BlueStore::_kv_sync_thread()
::encode(bluefs_extents, bl);
dout(10) << __func__ << " bluefs_extents now 0x" << std::hex
<< bluefs_extents << std::dec << dendl;
t->set(PREFIX_SUPER, "bluefs_extents", bl);
synct->set(PREFIX_SUPER, "bluefs_extents", bl);
}
}

Expand All @@ -6590,22 +6613,34 @@ void BlueStore::_kv_sync_thread()
++it) {
bluestore_wal_transaction_t& wt =*(*it)->wal_txn;
// kv metadata updates
_txc_finalize_kv(*it, t);
_txc_finalize_kv(*it, synct);
// cleanup the wal
string key;
get_wal_key(wt.seq, &key);
t->rm_single_key(PREFIX_WAL, key);
synct->rm_single_key(PREFIX_WAL, key);
}
int r = db->submit_transaction_sync(t);

// submit synct synchronously (block and wait for it to commit)
int r = db->submit_transaction_sync(synct);
assert(r == 0);

if (new_nid_max) {
nid_max = new_nid_max;
dout(10) << __func__ << " nid_max now " << nid_max << dendl;
}
if (new_blobid_max) {
blobid_max = new_blobid_max;
dout(10) << __func__ << " blobid_max now " << blobid_max << dendl;
}

utime_t finish = ceph_clock_now(NULL);
utime_t dur = finish - start;
dout(20) << __func__ << " committed " << kv_committing.size()
<< " cleaned " << wal_cleaning.size()
<< " in " << dur << dendl;
while (!kv_committing.empty()) {
TransContext *txc = kv_committing.front();
assert(txc->kv_submitted);
_txc_state_proc(txc);
kv_committing.pop_front();
}
Expand Down
15 changes: 10 additions & 5 deletions src/os/bluestore/BlueStore.h
Expand Up @@ -1160,6 +1160,8 @@ class BlueStore : public ObjectStore,
bluestore_wal_transaction_t *wal_txn; ///< wal transaction (if any)
vector<OnodeRef> wal_op_onodes;

bool kv_submitted = false; ///< true when we've been submitted to kv db

interval_set<uint64_t> allocated, released;
struct volatile_statfs{
enum {
Expand Down Expand Up @@ -1275,6 +1277,8 @@ class BlueStore : public ObjectStore,

uint64_t last_seq = 0;

std::atomic_int kv_committing_serially = {0};

OpSequencer()
//set the qlock to PTHREAD_MUTEX_RECURSIVE mode
: parent(NULL) {
Expand Down Expand Up @@ -1429,11 +1433,10 @@ class BlueStore : public ObjectStore,

vector<Cache*> cache_shards;

std::mutex id_lock;
std::atomic<uint64_t> nid_last = {0};
uint64_t nid_max = 0;
std::atomic<uint64_t> nid_max = {0};
std::atomic<uint64_t> blobid_last = {0};
uint64_t blobid_max = 0;
std::atomic<uint64_t> blobid_max = {0};

Throttle throttle_ops, throttle_bytes; ///< submit to commit
Throttle throttle_wal_ops, throttle_wal_bytes; ///< submit to wal complete
Expand All @@ -1452,8 +1455,10 @@ class BlueStore : public ObjectStore,
std::mutex kv_lock;
std::condition_variable kv_cond, kv_sync_cond;
bool kv_stop;
deque<TransContext*> kv_queue, kv_committing;
deque<TransContext*> wal_cleanup_queue, wal_cleaning;
deque<TransContext*> kv_queue; ///< ready, already submitted
deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
deque<TransContext*> kv_committing; ///< currently syncing
deque<TransContext*> wal_cleanup_queue; ///< wal done, ready for cleanup

PerfCounters *logger;

Expand Down
4 changes: 4 additions & 0 deletions src/os/bluestore/FreelistManager.h
Expand Up @@ -38,6 +38,10 @@ class FreelistManager {
virtual void release(
uint64_t offset, uint64_t length,
KeyValueDB::Transaction txn) = 0;

virtual bool supports_parallel_transactions() {
return false;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liewegas . I don't know for extentfreelistmanager why don't support parallel transaction? Hope your explain. Thanks!

};


Expand Down