Skip to content

Commit

Permalink
Merge remote-tracking branch 'somnathr/wip-optracker-optimization' in…
Browse files Browse the repository at this point in the history
…to giant
  • Loading branch information
Samuel Just committed Sep 19, 2014
2 parents f3acae4 + 11082f7 commit 184773d
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 75 deletions.
180 changes: 117 additions & 63 deletions src/common/TrackedOp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ static ostream& _prefix(std::ostream* _dout)

void OpHistory::on_shutdown()
{
Mutex::Locker history_lock(ops_history_lock);
arrived.clear();
duration.clear();
shutdown = true;
Expand All @@ -39,6 +40,8 @@ void OpHistory::insert(utime_t now, TrackedOpRef op)
{
if (shutdown)
return;

Mutex::Locker history_lock(ops_history_lock);
duration.insert(make_pair(op->get_duration(), op));
arrived.insert(make_pair(op->get_initiated(), op));
cleanup(now);
Expand All @@ -65,6 +68,7 @@ void OpHistory::cleanup(utime_t now)

void OpHistory::dump_ops(utime_t now, Formatter *f)
{
Mutex::Locker history_lock(ops_history_lock);
cleanup(now);
f->open_object_section("OpHistory");
f->dump_int("num to keep", history_size);
Expand All @@ -86,98 +90,141 @@ void OpHistory::dump_ops(utime_t now, Formatter *f)

void OpTracker::dump_historic_ops(Formatter *f)
{
Mutex::Locker locker(ops_in_flight_lock);
utime_t now = ceph_clock_now(cct);
history.dump_ops(now, f);
}

void OpTracker::dump_ops_in_flight(Formatter *f)
{
Mutex::Locker locker(ops_in_flight_lock);
f->open_object_section("ops_in_flight"); // overall dump
f->dump_int("num_ops", ops_in_flight.size());
uint64_t total_ops_in_flight = 0;
f->open_array_section("ops"); // list of TrackedOps
utime_t now = ceph_clock_now(cct);
for (xlist<TrackedOp*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
f->open_object_section("op");
(*p)->dump(now, f);
f->close_section(); // this TrackedOp
for (uint32_t i = 0; i < num_optracker_shards; i++) {
ShardedTrackingData* sdata = sharded_in_flight_list[i];
assert(NULL != sdata);
Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
for (xlist<TrackedOp*>::iterator p = sdata->ops_in_flight_sharded.begin(); !p.end(); ++p) {
f->open_object_section("op");
(*p)->dump(now, f);
f->close_section(); // this TrackedOp
total_ops_in_flight++;
}
}
f->close_section(); // list of TrackedOps
f->dump_int("num_ops", total_ops_in_flight);
f->close_section(); // overall dump
}

void OpTracker::register_inflight_op(xlist<TrackedOp*>::item *i)
{
if (!tracking_enabled)
return;
Mutex::Locker locker(ops_in_flight_lock);
ops_in_flight.push_back(i);
ops_in_flight.back()->seq = seq++;

uint64_t current_seq = seq.inc();
uint32_t shard_index = current_seq % num_optracker_shards;
ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
assert(NULL != sdata);
{
Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
sdata->ops_in_flight_sharded.push_back(i);
sdata->ops_in_flight_sharded.back()->seq = current_seq;
}
}

void OpTracker::unregister_inflight_op(TrackedOp *i)
{
// caller checks;
assert(tracking_enabled);

Mutex::Locker locker(ops_in_flight_lock);
assert(i->xitem.get_list() == &ops_in_flight);
uint32_t shard_index = i->seq % num_optracker_shards;
ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
assert(NULL != sdata);
{
Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
assert(i->xitem.get_list() == &sdata->ops_in_flight_sharded);
i->xitem.remove_myself();
}
i->_unregistered();
utime_t now = ceph_clock_now(cct);
i->xitem.remove_myself();
history.insert(now, TrackedOpRef(i));
}

bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector)
{
Mutex::Locker locker(ops_in_flight_lock);
if (!ops_in_flight.size()) // this covers tracking_enabled, too
return false;

utime_t now = ceph_clock_now(cct);
utime_t too_old = now;
too_old -= complaint_time;
utime_t oldest_op;
uint64_t total_ops_in_flight = 0;
bool got_first_op = false;

for (uint32_t i = 0; i < num_optracker_shards; i++) {
ShardedTrackingData* sdata = sharded_in_flight_list[i];
assert(NULL != sdata);
Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
if (!sdata->ops_in_flight_sharded.empty()) {
utime_t oldest_op_tmp = sdata->ops_in_flight_sharded.front()->get_initiated();
if (!got_first_op) {
oldest_op = oldest_op_tmp;
got_first_op = true;
} else if (oldest_op_tmp < oldest_op) {
oldest_op = oldest_op_tmp;
}
}
total_ops_in_flight += sdata->ops_in_flight_sharded.size();
}

if (0 == total_ops_in_flight)
return false;

utime_t oldest_secs = now - ops_in_flight.front()->get_initiated();
utime_t oldest_secs = now - oldest_op;

dout(10) << "ops_in_flight.size: " << ops_in_flight.size()
dout(10) << "ops_in_flight.size: " << total_ops_in_flight
<< "; oldest is " << oldest_secs
<< " seconds old" << dendl;

if (oldest_secs < complaint_time)
return false;

xlist<TrackedOp*>::iterator i = ops_in_flight.begin();
warning_vector.reserve(log_threshold + 1);

int slow = 0; // total slow
int warned = 0; // total logged
while (!i.end() && (*i)->get_initiated() < too_old) {
slow++;
for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
ShardedTrackingData* sdata = sharded_in_flight_list[iter];
assert(NULL != sdata);
Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
if (sdata->ops_in_flight_sharded.empty())
continue;
xlist<TrackedOp*>::iterator i = sdata->ops_in_flight_sharded.begin();
while (!i.end() && (*i)->get_initiated() < too_old) {
slow++;

// exponential backoff of warning intervals
if (((*i)->get_initiated() +
// exponential backoff of warning intervals
if (((*i)->get_initiated() +
(complaint_time * (*i)->warn_interval_multiplier)) < now) {
// will warn
if (warning_vector.empty())
warning_vector.push_back("");
warned++;
if (warned > log_threshold)
break;

utime_t age = now - (*i)->get_initiated();
stringstream ss;
ss << "slow request " << age << " seconds old, received at "
<< (*i)->get_initiated() << ": ";
(*i)->_dump_op_descriptor_unlocked(ss);
ss << " currently "
<< ((*i)->current.size() ? (*i)->current : (*i)->state_string());
warning_vector.push_back(ss.str());

// only those that have been shown will backoff
(*i)->warn_interval_multiplier *= 2;
if (warning_vector.empty())
warning_vector.push_back("");
warned++;
if (warned > log_threshold)
break;

utime_t age = now - (*i)->get_initiated();
stringstream ss;
ss << "slow request " << age << " seconds old, received at "
<< (*i)->get_initiated() << ": ";
(*i)->_dump_op_descriptor_unlocked(ss);
ss << " currently "
<< ((*i)->current.size() ? (*i)->current : (*i)->state_string());
warning_vector.push_back(ss.str());

// only those that have been shown will backoff
(*i)->warn_interval_multiplier *= 2;
}
++i;
}
++i;
}

// only summarize if we warn about any. if everything has backed
Expand All @@ -194,28 +241,34 @@ bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector)

void OpTracker::get_age_ms_histogram(pow2_hist_t *h)
{
Mutex::Locker locker(ops_in_flight_lock);

h->clear();

utime_t now = ceph_clock_now(NULL);
unsigned bin = 30;
uint32_t lb = 1 << (bin-1); // lower bound for this bin
int count = 0;
for (xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); !i.end(); ++i) {
utime_t age = now - (*i)->get_initiated();
uint32_t ms = (long)(age * 1000.0);
if (ms >= lb) {
count++;
continue;
}
if (count)
h->set_bin(bin, count);
while (lb > ms) {
bin--;
lb >>= 1;

for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
ShardedTrackingData* sdata = sharded_in_flight_list[iter];
assert(NULL != sdata);
Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);

for (xlist<TrackedOp*>::iterator i = sdata->ops_in_flight_sharded.begin();
!i.end(); ++i) {
utime_t age = now - (*i)->get_initiated();
uint32_t ms = (long)(age * 1000.0);
if (ms >= lb) {
count++;
continue;
}
if (count)
h->set_bin(bin, count);
while (lb > ms) {
bin--;
lb >>= 1;
}
count = 1;
}
count = 1;
}
if (count)
h->set_bin(bin, count);
Expand All @@ -231,18 +284,19 @@ void OpTracker::mark_event(TrackedOp *op, const string &dest, utime_t time)
void OpTracker::_mark_event(TrackedOp *op, const string &evt,
utime_t time)
{
stringstream ss;
op->_dump_op_descriptor_unlocked(ss);
dout(5) << //"reqid: " << op->get_reqid() <<
", seq: " << op->seq
dout(5);
*_dout << "seq: " << op->seq
<< ", time: " << time << ", event: " << evt
<< ", op: " << ss.str() << dendl;
<< ", op: ";
op->_dump_op_descriptor_unlocked(*_dout);
*_dout << dendl;

}

void OpTracker::RemoveOnDelete::operator()(TrackedOp *op) {
op->mark_event("done");
op->_unregistered();
if (!tracker->tracking_enabled) {
op->_unregistered();
delete op;
return;
}
Expand Down
40 changes: 30 additions & 10 deletions src/common/TrackedOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ class OpTracker;
class OpHistory {
set<pair<utime_t, TrackedOpRef> > arrived;
set<pair<double, TrackedOpRef> > duration;
Mutex ops_history_lock;
void cleanup(utime_t now);
bool shutdown;
uint32_t history_size;
uint32_t history_duration;

public:
OpHistory() : shutdown(false),
OpHistory() : ops_history_lock("OpHistory::Lock"), shutdown(false),
history_size(0), history_duration(0) {}
~OpHistory() {
assert(arrived.empty());
Expand All @@ -59,9 +60,15 @@ class OpTracker {
};
friend class RemoveOnDelete;
friend class OpHistory;
uint64_t seq;
Mutex ops_in_flight_lock;
xlist<TrackedOp *> ops_in_flight;
atomic64_t seq;
struct ShardedTrackingData {
Mutex ops_in_flight_lock_sharded;
xlist<TrackedOp *> ops_in_flight_sharded;
ShardedTrackingData(string lock_name):
ops_in_flight_lock_sharded(lock_name.c_str()) {}
};
vector<ShardedTrackingData*> sharded_in_flight_list;
uint32_t num_optracker_shards;
OpHistory history;
float complaint_time;
int log_threshold;
Expand All @@ -70,9 +77,19 @@ class OpTracker {
public:
bool tracking_enabled;
CephContext *cct;
OpTracker(CephContext *cct_, bool tracking) : seq(0), ops_in_flight_lock("OpTracker mutex"),
complaint_time(0), log_threshold(0),
tracking_enabled(tracking), cct(cct_) {}
OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards) : seq(0),
num_optracker_shards(num_shards),
complaint_time(0), log_threshold(0),
tracking_enabled(tracking), cct(cct_) {

for (uint32_t i = 0; i < num_optracker_shards; i++) {
char lock_name[32] = {0};
snprintf(lock_name, sizeof(lock_name), "%s:%d", "OpTracker::ShardedLock", i);
ShardedTrackingData* one_shard = new ShardedTrackingData(lock_name);
sharded_in_flight_list.push_back(one_shard);
}
}

void set_complaint_and_threshold(float time, int threshold) {
complaint_time = time;
log_threshold = threshold;
Expand Down Expand Up @@ -100,11 +117,14 @@ class OpTracker {
utime_t time = ceph_clock_now(g_ceph_context));

void on_shutdown() {
Mutex::Locker l(ops_in_flight_lock);
history.on_shutdown();
}
~OpTracker() {
assert(ops_in_flight.empty());
while (!sharded_in_flight_list.empty()) {
assert((sharded_in_flight_list.back())->ops_in_flight_sharded.empty());
delete sharded_in_flight_list.back();
sharded_in_flight_list.pop_back();
}
}

template <typename T, typename U>
Expand All @@ -126,7 +146,7 @@ class TrackedOp {

utime_t initiated_at;
list<pair<utime_t, string> > events; /// list of events and their times
Mutex lock; /// to protect the events list
mutable Mutex lock; /// to protect the events list
string current; /// the current state the event is in
uint64_t seq; /// a unique value set by the OpTracker

Expand Down
1 change: 1 addition & 0 deletions src/common/config_opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ OPTION(osd_debug_verify_stray_on_activate, OPT_BOOL, false)
OPTION(osd_debug_skip_full_check_in_backfill_reservation, OPT_BOOL, false)
OPTION(osd_debug_reject_backfill_probability, OPT_DOUBLE, 0)
OPTION(osd_enable_op_tracker, OPT_BOOL, true) // enable/disable OSD op tracking
OPTION(osd_num_op_tracker_shard, OPT_U32, 32) // The number of shards for holding the ops
OPTION(osd_op_history_size, OPT_U32, 20) // Max number of completed ops to track
OPTION(osd_op_history_duration, OPT_U32, 600) // Oldest completed op to track
OPTION(osd_target_transaction_size, OPT_INT, 30) // to adjust various transactions that batch smaller items
Expand Down
3 changes: 2 additions & 1 deletion src/mds/MDS.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) :
messenger(m),
monc(mc),
log_client(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS),
op_tracker(cct, m->cct->_conf->mds_enable_op_tracker),
op_tracker(cct, m->cct->_conf->mds_enable_op_tracker,
m->cct->_conf->osd_num_op_tracker_shard),
finisher(cct),
sessionmap(this),
progress_thread(this),
Expand Down
1 change: 1 addition & 0 deletions src/mds/Mutation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ void MDRequestImpl::_dump(utime_t now, Formatter *f) const
}
{
f->open_array_section("events");
Mutex::Locker l(lock);
for (list<pair<utime_t, string> >::const_iterator i = events.begin();
i != events.end();
++i) {
Expand Down

0 comments on commit 184773d

Please sign in to comment.