Skip to content

Commit

Permalink
bluestore_no_finisher option was added
Browse files Browse the repository at this point in the history
- It must be used with bluestore_rtc option
- Context::HAVE_PGLOCK constant was added to do context callbacks, w/o pg lock
- It's desirable to use this option with small core machine

Signed-off-by: Taeksang Kim <voidbag@gmail.com>
  • Loading branch information
voidbag committed Nov 8, 2016
1 parent 11fcce2 commit f3879f5
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 45 deletions.
1 change: 1 addition & 0 deletions src/common/config_opts.h
Expand Up @@ -1051,6 +1051,7 @@ OPTION(bluestore_debug_randomize_serial_transaction, OPT_INT, 0)
OPTION(bluestore_inject_wal_apply_delay, OPT_FLOAT, 0)
OPTION(bluestore_shard_finishers, OPT_BOOL, false)
OPTION(bluestore_rtc, OPT_BOOL, false)
OPTION(bluestore_no_finisher, OPT_BOOL, false)

OPTION(kstore_max_ops, OPT_U64, 512)
OPTION(kstore_max_bytes, OPT_U64, 64*1024*1024)
Expand Down
1 change: 1 addition & 0 deletions src/include/Context.h
Expand Up @@ -58,6 +58,7 @@ class Context {
virtual void finish(int r) = 0;

public:
static const int HAVE_PGLOCK = 256;
Context() {}
virtual ~Context() {} // we want a virtual destructor!!!
virtual void complete(int r) {
Expand Down
33 changes: 24 additions & 9 deletions src/os/bluestore/BlueStore.cc
Expand Up @@ -6525,14 +6525,27 @@ void BlueStore::_txc_finish_kv(TransContext *txc)
finishers[n]->queue(txc->onreadable);
txc->onreadable = NULL;
}
if (txc->oncommit) {
finishers[n]->queue(txc->oncommit);
txc->oncommit = NULL;
}
while (!txc->oncommits.empty()) {
auto f = txc->oncommits.front();
finishers[n]->queue(f);
txc->oncommits.pop_front();
if (g_conf->bluestore_no_finisher && !txc->is_pipelined_io &&
txc->osr->is_finisher_done()) {
if (txc->oncommit) {
txc->oncommit->complete(Context::HAVE_PGLOCK);
txc->oncommit = NULL;
}
while (!txc->oncommits.empty()) {
auto f = txc->oncommits.front();
f->complete(Context::HAVE_PGLOCK);
txc->oncommits.pop_front();
}
} else {
if (txc->oncommit) {
finishers[n]->queue(txc->oncommit);
txc->oncommit = NULL;
}
while (!txc->oncommits.empty()) {
auto f = txc->oncommits.front();
finishers[n]->queue(f);
txc->oncommits.pop_front();
}
}

--txc->osr->kv_finisher_submitting;
Expand Down Expand Up @@ -6966,10 +6979,12 @@ int BlueStore::queue_transactions(

// prepare
TransContext *txc = _txc_create(osr);
std::atomic_int *kv_oncommit;
kv_oncommit = &txc->osr->kv_doing_oncommit;
++txc->osr->kv_finisher_submitting;
txc->onreadable = onreadable;
txc->onreadable_sync = onreadable_sync;
txc->oncommit = ondisk;
txc->oncommit = BlueStoreContext::create(ondisk, kv_oncommit);

for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
(*p).set_osr(osr);
Expand Down
33 changes: 32 additions & 1 deletion src/os/bluestore/BlueStore.h
Expand Up @@ -1138,6 +1138,27 @@ class BlueStore : public ObjectStore,
class OpSequencer;
typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;

class BlueStoreContext : public Context {
Context *context;
std::atomic_int *counter;

BlueStoreContext(Context *ctx, std::atomic_int *cnt) {
context = ctx;
counter = cnt;
++(*counter);
}

void finish(int r) override {
if (context != NULL)
context->complete(r);
--(*counter);
}
public:
static BlueStoreContext *create(Context *ctx, std::atomic_int *cnt) {
return new BlueStoreContext(ctx, cnt);
}
};

struct TransContext {
typedef enum {
STATE_PREPARE,
Expand Down Expand Up @@ -1324,6 +1345,8 @@ class BlueStore : public ObjectStore,
std::atomic_int kv_committing_serially = {0};

std::atomic_int kv_finisher_submitting = {0};
std::atomic_int kv_doing_oncommit = {0};
std::atomic_int kv_doing_oncommits = {0};

OpSequencer()
//set the qlock to PTHREAD_MUTEX_RECURSIVE mode
Expand All @@ -1333,6 +1356,13 @@ class BlueStore : public ObjectStore,
assert(q.empty());
}

bool is_finisher_done(void) {
if (kv_doing_oncommit == 1 && kv_doing_oncommits == 0) {
return true;
}
return false;
}

void queue_new(TransContext *txc) {
std::lock_guard<std::mutex> l(qlock);
txc->seq = ++last_seq;
Expand All @@ -1355,7 +1385,8 @@ class BlueStore : public ObjectStore,
return true;
}
assert(txc->state < TransContext::STATE_KV_DONE);
txc->oncommits.push_back(c);
txc->oncommits.push_back(BlueStoreContext::create(c,
&txc->osr->kv_doing_oncommits));
return false;
}

Expand Down
14 changes: 9 additions & 5 deletions src/osd/PG.h
Expand Up @@ -528,12 +528,14 @@ class PG : DoutPrefixProvider {
eversion_t v;
C_UpdateLastRollbackInfoTrimmedToApplied(PG *pg, epoch_t e, eversion_t v)
: pg(pg), e(e), v(v) {}
void finish(int) {
pg->lock();
void finish(int r) {
if (r != Context::HAVE_PGLOCK)
pg->lock();
if (!pg->pg_has_reset_since(e)) {
pg->last_rollback_info_trimmed_to_applied = v;
}
pg->unlock();
if (r != Context::HAVE_PGLOCK)
pg->unlock();
}
};
// entries <= last_rollback_info_trimmed_to_applied have been trimmed,
Expand Down Expand Up @@ -1352,13 +1354,15 @@ class PG : DoutPrefixProvider {
QueuePeeringEvt(PG *pg, epoch_t epoch, EVT evt) :
pg(pg), epoch(epoch), evt(evt) {}
void finish(int r) {
pg->lock();
if (r != Context::HAVE_PGLOCK)
pg->lock();
pg->queue_peering_event(PG::CephPeeringEvtRef(
new PG::CephPeeringEvt(
epoch,
epoch,
evt)));
pg->unlock();
if (r != Context::HAVE_PGLOCK)
pg->unlock();
}
};

Expand Down
60 changes: 39 additions & 21 deletions src/osd/ReplicatedPG.cc
Expand Up @@ -86,11 +86,13 @@ struct ReplicatedPG::C_OSD_OnApplied : Context {
epoch_t epoch,
eversion_t v)
: pg(pg), epoch(epoch), v(v) {}
void finish(int) override {
pg->lock();
void finish(int r) override {
if (r != Context::HAVE_PGLOCK)
pg->lock();
if (!pg->pg_has_reset_since(epoch))
pg->op_applied(v);
pg->unlock();
if (r != Context::HAVE_PGLOCK)
pg->unlock();
}
};

Expand Down Expand Up @@ -150,12 +152,14 @@ class ReplicatedPG::BlessedContext : public Context {
BlessedContext(ReplicatedPG *pg, Context *c, epoch_t e)
: pg(pg), c(c), e(e) {}
void finish(int r) {
pg->lock();
if (r != Context::HAVE_PGLOCK)
pg->lock();
if (pg->pg_has_reset_since(e))
delete c;
else
c->complete(r);
pg->unlock();
if (r != Context::HAVE_PGLOCK)
pg->unlock();
}
};

Expand Down Expand Up @@ -2633,16 +2637,19 @@ struct C_ProxyRead : public Context {
void finish(int r) {
if (prdop->canceled)
return;
pg->lock();
if (r != Context::HAVE_PGLOCK)
pg->lock();
if (prdop->canceled) {
pg->unlock();
if (r != Context::HAVE_PGLOCK)
pg->unlock();
return;
}
if (last_peering_reset == pg->get_last_peering_reset()) {
pg->finish_proxy_read(oid, tid, r);
pg->osd->logger->tinc(l_osd_tier_r_lat, ceph_clock_now(NULL) - start);
}
pg->unlock();
if (r != Context::HAVE_PGLOCK)
pg->unlock();
}
};

Expand Down Expand Up @@ -2828,15 +2835,18 @@ struct C_ProxyWrite_Commit : public Context {
void finish(int r) {
if (pwop->canceled)
return;
pg->lock();
if (r != Context::HAVE_PGLOCK)
pg->lock();
if (pwop->canceled) {
pg->unlock();
if (r != Context::HAVE_PGLOCK)
pg->unlock();
return;
}
if (last_peering_reset == pg->get_last_peering_reset()) {
pg->finish_proxy_write(oid, tid, r);
}
pg->unlock();
if (r != Context::HAVE_PGLOCK)
pg->unlock();
}
};

Expand Down Expand Up @@ -7151,11 +7161,13 @@ struct C_Copyfrom : public Context {
void finish(int r) {
if (r == -ECANCELED)
return;
pg->lock();
if (r != Context::HAVE_PGLOCK)
pg->lock();
if (last_peering_reset == pg->get_last_peering_reset()) {
pg->process_copy_chunk(oid, tid, r);
}
pg->unlock();
if (r != Context::HAVE_PGLOCK)
pg->unlock();
}
};

Expand Down Expand Up @@ -8086,12 +8098,14 @@ struct C_Flush : public Context {
void finish(int r) {
if (r == -ECANCELED)
return;
pg->lock();
if (r != Context::HAVE_PGLOCK)
pg->lock();
if (last_peering_reset == pg->get_last_peering_reset()) {
pg->finish_flush(oid, tid, r);
pg->osd->logger->tinc(l_osd_tier_flush_lat, ceph_clock_now(NULL) - start);
}
pg->unlock();
if (r != Context::HAVE_PGLOCK)
pg->unlock();
}
};

Expand Down Expand Up @@ -8905,8 +8919,9 @@ void ReplicatedPG::submit_log_entries(
ceph_tid_t rep_tid,
epoch_t epoch)
: pg(pg), rep_tid(rep_tid), epoch(epoch) {}
void finish(int) override {
pg->lock();
void finish(int r) override {
if (r != Context::HAVE_PGLOCK)
pg->lock();
if (!pg->pg_has_reset_since(epoch)) {
auto it = pg->log_entry_update_waiting_on.find(rep_tid);
assert(it != pg->log_entry_update_waiting_on.end());
Expand All @@ -8919,7 +8934,8 @@ void ReplicatedPG::submit_log_entries(
pg->log_entry_update_waiting_on.erase(it);
}
}
pg->unlock();
if (r != Context::HAVE_PGLOCK)
pg->unlock();
}
};
t.register_on_complete(
Expand All @@ -8937,11 +8953,13 @@ void ReplicatedPG::submit_log_entries(
: pg(pg),
on_complete(std::move(on_complete)),
epoch(epoch) {}
void finish(int) override {
pg->lock();
void finish(int r) override {
if (r != Context::HAVE_PGLOCK)
pg->lock();
if (!pg->pg_has_reset_since(epoch))
on_complete();
pg->unlock();
if (r != Context::HAVE_PGLOCK)
pg->unlock();
}
};
t.register_on_complete(
Expand Down
20 changes: 12 additions & 8 deletions src/osd/Watch.cc
Expand Up @@ -72,11 +72,11 @@ class NotifyTimeoutCB : public CancelableContext {
bool canceled; // protected by notif lock
public:
explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {}
void finish(int) {
void finish(int r) {
notif->osd->watch_lock.Unlock();
notif->lock.Lock();
if (!canceled)
notif->do_timeout(); // drops lock
notif->do_timeout(r); // drops lock
else
notif->lock.Unlock();
notif->osd->watch_lock.Lock();
Expand All @@ -87,7 +87,7 @@ class NotifyTimeoutCB : public CancelableContext {
}
};

void Notify::do_timeout()
void Notify::do_timeout(int r)
{
assert(lock.is_locked_by_me());
dout(10) << "timeout" << dendl;
Expand All @@ -108,11 +108,13 @@ void Notify::do_timeout()
i != _watchers.end();
++i) {
boost::intrusive_ptr<ReplicatedPG> pg((*i)->get_pg());
pg->lock();
if (r != Context::HAVE_PGLOCK)
pg->lock();
if (!(*i)->is_discarded()) {
(*i)->cancel_notify(self.lock());
}
pg->unlock();
if (r != Context::HAVE_PGLOCK)
pg->unlock();
}
}

Expand Down Expand Up @@ -239,17 +241,19 @@ class HandleWatchTimeout : public CancelableContext {
canceled = true;
}
void finish(int) { assert(0); /* not used */ }
void complete(int) {
void complete(int r) {
dout(10) << "HandleWatchTimeout" << dendl;
boost::intrusive_ptr<ReplicatedPG> pg(watch->pg);
OSDService *osd(watch->osd);
osd->watch_lock.Unlock();
pg->lock();
if (r != Context::HAVE_PGLOCK)
pg->lock();
watch->cb = NULL;
if (!watch->is_discarded() && !canceled)
watch->pg->handle_watch_timeout(watch);
delete this; // ~Watch requires pg lock!
pg->unlock();
if (r != Context::HAVE_PGLOCK)
pg->unlock();
osd->watch_lock.Lock();
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/osd/Watch.h
Expand Up @@ -80,7 +80,7 @@ class Notify {
void maybe_complete_notify();

/// Called on Notify timeout
void do_timeout();
void do_timeout(int r);

Notify(
ConnectionRef client,
Expand Down

0 comments on commit f3879f5

Please sign in to comment.