Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

common/OpHistory: move insert/cleanup into separate thread #20540

Merged
merged 6 commits into from
Feb 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions src/common/TrackedOp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,63 @@ static ostream& _prefix(std::ostream* _dout)
return *_dout << "-- op tracker -- ";
}

void OpHistoryServiceThread::break_thread() {
queue_spinlock.lock();
_external_queue.clear();
_break_thread = true;
queue_spinlock.unlock();
}

void* OpHistoryServiceThread::entry() {
int sleep_time = 1000;
list<pair<utime_t, TrackedOpRef>> internal_queue;
while (1) {
queue_spinlock.lock();
if (_break_thread) {
queue_spinlock.unlock();
break;
}
internal_queue.swap(_external_queue);
queue_spinlock.unlock();
if (internal_queue.empty()) {
usleep(sleep_time);
if (sleep_time < 128000) {
sleep_time <<= 2;
}
} else {
sleep_time = 1000;
}

while (!internal_queue.empty()) {
pair<utime_t, TrackedOpRef> op = internal_queue.front();
_ophistory->_insert_delayed(op.first, op.second);
internal_queue.pop_front();
}
}
return nullptr;
}


void OpHistory::on_shutdown()
{
opsvc.break_thread();
opsvc.join();
Mutex::Locker history_lock(ops_history_lock);
arrived.clear();
duration.clear();
slow_op.clear();
shutdown = true;
}

void OpHistory::insert(utime_t now, TrackedOpRef op)
void OpHistory::_insert_delayed(const utime_t& now, TrackedOpRef op)
{
Mutex::Locker history_lock(ops_history_lock);
if (shutdown)
return;
duration.insert(make_pair(op->get_duration(), op));
double opduration = op->get_duration();
duration.insert(make_pair(opduration, op));
arrived.insert(make_pair(op->get_initiated(), op));
if (op->get_duration() >= history_slow_op_threshold)
if (opduration >= history_slow_op_threshold)
slow_op.insert(make_pair(op->get_initiated(), op));
cleanup(now);
}
Expand Down Expand Up @@ -434,7 +474,7 @@ void TrackedOp::mark_event_string(const string &event, utime_t stamp)

{
Mutex::Locker l(lock);
events.push_back(Event(stamp, event));
events.emplace_back(stamp, event);
current = events.back().c_str();
}
dout(6) << " seq: " << seq
Expand All @@ -452,7 +492,7 @@ void TrackedOp::mark_event(const char *event, utime_t stamp)

{
Mutex::Locker l(lock);
events.push_back(Event(stamp, event));
events.emplace_back(stamp, event);
current = event;
}
dout(6) << " seq: " << seq
Expand Down
50 changes: 44 additions & 6 deletions src/common/TrackedOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,68 @@
#define OPTRACKER_PREALLOC_EVENTS 20

class TrackedOp;
class OpHistory;

typedef boost::intrusive_ptr<TrackedOp> TrackedOpRef;

class OpHistoryServiceThread : public Thread
{
private:
list<pair<utime_t, TrackedOpRef>> _external_queue;
OpHistory* _ophistory;
mutable ceph::spinlock queue_spinlock;
bool _break_thread;
public:
explicit OpHistoryServiceThread(OpHistory* parent)
: _ophistory(parent),
_break_thread(false) { }

void break_thread();
void insert_op(const utime_t& now, TrackedOpRef op) {
queue_spinlock.lock();
_external_queue.emplace_back(now, op);
queue_spinlock.unlock();
}

void *entry() override;
};


class OpHistory {
set<pair<utime_t, TrackedOpRef> > arrived;
set<pair<double, TrackedOpRef> > duration;
set<pair<utime_t, TrackedOpRef> > slow_op;
Mutex ops_history_lock;
void cleanup(utime_t now);
bool shutdown;
uint32_t history_size;
uint32_t history_duration;
uint32_t history_slow_op_size;
uint32_t history_slow_op_threshold;
std::atomic_bool shutdown;
OpHistoryServiceThread opsvc;
friend class OpHistoryServiceThread;

public:
OpHistory() : ops_history_lock("OpHistory::Lock"), shutdown(false),
OpHistory() : ops_history_lock("OpHistory::Lock"),
history_size(0), history_duration(0),
history_slow_op_size(0), history_slow_op_threshold(0) {}
history_slow_op_size(0), history_slow_op_threshold(0),
shutdown(false), opsvc(this) {
opsvc.create("OpHistorySvc");
}
~OpHistory() {
assert(arrived.empty());
assert(duration.empty());
assert(slow_op.empty());
}
void insert(utime_t now, TrackedOpRef op);
void insert(const utime_t& now, TrackedOpRef op)
{
if (shutdown)
return;

opsvc.insert_op(now, op);
}

void _insert_delayed(const utime_t& now, TrackedOpRef op);
void dump_ops(utime_t now, Formatter *f, set<string> filters = {""});
void dump_ops_by_duration(utime_t now, Formatter *f, set<string> filters = {""});
void dump_slow_ops(utime_t now, Formatter *f, set<string> filters = {""});
Expand All @@ -65,8 +103,8 @@ class OpTracker {
friend class OpHistory;
std::atomic<int64_t> seq = { 0 };
vector<ShardedTrackingData*> sharded_in_flight_list;
uint32_t num_optracker_shards;
OpHistory history;
uint32_t num_optracker_shards;
float complaint_time;
int log_threshold;
std::atomic<bool> tracking_enabled;
Expand Down Expand Up @@ -332,7 +370,7 @@ class TrackedOp : public boost::intrusive::list_base_hook<> {

void tracking_start() {
if (tracker->register_inflight_op(this)) {
events.push_back(Event(initiated_at, "initiated"));
events.emplace_back(initiated_at, "initiated");
state = STATE_LIVE;
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/mon/Monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,

Monitor::~Monitor()
{
op_tracker.on_shutdown();

for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
delete *p;
delete config_key_service;
Expand Down
3 changes: 1 addition & 2 deletions src/mon/OSDMonitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ OSDMonitor::OSDMonitor(
inc_osd_cache(g_conf->mon_osd_cache_size),
full_osd_cache(g_conf->mon_osd_cache_size),
last_attempted_minwait_time(utime_t()),
mapper(mn->cct, &mn->cpu_tp),
op_tracker(cct, true, 1)
mapper(mn->cct, &mn->cpu_tp)
{}

bool OSDMonitor::_have_pending_crush()
Expand Down
2 changes: 0 additions & 2 deletions src/mon/OSDMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,6 @@ class OSDMonitor : public PaxosService {
bool preprocess_remove_snaps(MonOpRequestRef op);
bool prepare_remove_snaps(MonOpRequestRef op);

OpTracker op_tracker;

int load_metadata(int osd, map<string, string>& m, ostream *err);
void count_metadata(const string& field, Formatter *f);
public:
Expand Down