Skip to content

Commit

Permalink
Merge pull request #53033 from rhcs-dashboard/wip-62025-reef
Browse files Browse the repository at this point in the history
reef: msgr: AsyncMessenger add faulted connections metrics
  • Loading branch information
jmolmo committed Oct 10, 2023
2 parents a01c257 + 83535ef commit 4638ce2
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 35 deletions.
35 changes: 21 additions & 14 deletions src/mgr/MgrClient.cc
Expand Up @@ -14,6 +14,7 @@

#include "MgrClient.h"

#include "common/perf_counters_key.h"
#include "mgr/MgrContext.h"
#include "mon/MonMap.h"

Expand Down Expand Up @@ -331,6 +332,12 @@ void MgrClient::_send_report()
const PerfCounters::perf_counter_data_any_d &ctr,
const PerfCounters &perf_counters)
{
// FIXME: We don't send labeled perf counters to the mgr currently.
auto labels = ceph::perf_counters::key_labels(perf_counters.get_name());
if (labels.begin() != labels.end()) {
return false;
}

return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold;
};

Expand Down Expand Up @@ -367,20 +374,20 @@ void MgrClient::_send_report()
}

if (session->declared.count(path) == 0) {
ldout(cct,20) << " declare " << path << dendl;
PerfCounterType type;
type.path = path;
if (data.description) {
type.description = data.description;
}
if (data.nick) {
type.nick = data.nick;
}
type.type = data.type;
type.priority = perf_counters.get_adjusted_priority(data.prio);
type.unit = data.unit;
report->declare_types.push_back(std::move(type));
session->declared.insert(path);
ldout(cct, 20) << " declare " << path << dendl;
PerfCounterType type;
type.path = path;
if (data.description) {
type.description = data.description;
}
if (data.nick) {
type.nick = data.nick;
}
type.type = data.type;
type.priority = perf_counters.get_adjusted_priority(data.prio);
type.unit = data.unit;
report->declare_types.push_back(std::move(type));
session->declared.insert(path);
}

encode(static_cast<uint64_t>(data.u64), report->packed);
Expand Down
3 changes: 3 additions & 0 deletions src/msg/async/AsyncConnection.cc
Expand Up @@ -116,6 +116,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
: Connection(cct, m),
delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
logger(w->get_perf_counter()),
labeled_logger(w->get_labeled_perf_counter()),
state(STATE_NONE), port(-1),
dispatch_queue(q), recv_buf(NULL),
recv_max_prefetch(std::max<int64_t>(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
Expand Down Expand Up @@ -791,6 +792,7 @@ void AsyncConnection::tick(uint64_t id)
<< target_addr << ", fault."
<< dendl;
protocol->fault();
labeled_logger->inc(l_msgr_connection_ready_timeouts);
} else {
last_tick_id = center->create_time_event(connect_timeout_us, tick_handler);
}
Expand All @@ -803,6 +805,7 @@ void AsyncConnection::tick(uint64_t id)
<< " us, fault."
<< dendl;
protocol->fault();
labeled_logger->inc(l_msgr_connection_idle_timeouts);
} else {
last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
}
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/AsyncConnection.h
Expand Up @@ -173,6 +173,7 @@ class AsyncConnection : public Connection {
AsyncMessenger *async_msgr;
uint64_t conn_id;
PerfCounters *logger;
PerfCounters *labeled_logger;
int state;
ConnectedSocket cs;
int port;
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/ProtocolV1.cc
Expand Up @@ -2414,6 +2414,7 @@ CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing,
existing->worker->references--;
new_worker->references++;
existing->logger = new_worker->get_perf_counter();
existing->labeled_logger = new_worker->get_labeled_perf_counter();
existing->worker = new_worker;
existing->center = new_center;
if (existing->delay_state)
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/ProtocolV2.cc
Expand Up @@ -2808,6 +2808,7 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
existing->worker->references--;
new_worker->references++;
existing->logger = new_worker->get_perf_counter();
existing->labeled_logger = new_worker->get_labeled_perf_counter();
existing->worker = new_worker;
existing->center = new_center;
if (existing->delay_state)
Expand Down
46 changes: 42 additions & 4 deletions src/msg/async/Stack.h
Expand Up @@ -17,10 +17,12 @@
#ifndef CEPH_MSG_ASYNC_STACK_H
#define CEPH_MSG_ASYNC_STACK_H

#include "include/spinlock.h"
#include "common/perf_counters.h"
#include "msg/msg_types.h"
#include "common/perf_counters_key.h"
#include "include/spinlock.h"
#include "msg/async/Event.h"
#include "msg/msg_types.h"
#include <string>

class Worker;
class ConnectedSocketImpl {
Expand Down Expand Up @@ -214,6 +216,15 @@ enum {
l_msgr_last,
};

enum {
l_msgr_labeled_first = l_msgr_last + 1,

l_msgr_connection_ready_timeouts,
l_msgr_connection_idle_timeouts,

l_msgr_labeled_last,
};

class Worker {
std::mutex init_lock;
std::condition_variable init_cond;
Expand All @@ -224,6 +235,7 @@ class Worker {

CephContext *cct;
PerfCounters *perf_logger;
PerfCounters *perf_labeled_logger;
unsigned id;

std::atomic_uint references;
Expand All @@ -233,9 +245,11 @@ class Worker {
Worker& operator=(const Worker&) = delete;

Worker(CephContext *c, unsigned worker_id)
: cct(c), perf_logger(NULL), id(worker_id), references(0), center(c) {
: cct(c), id(worker_id), references(0), center(c) {
char name[128];
sprintf(name, "AsyncMessenger::Worker-%u", id);
char name_prefix[] = "AsyncMessenger::Worker";
sprintf(name, "%s-%u", name_prefix, id);

// initialize perf_logger
PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);

Expand All @@ -259,12 +273,35 @@ class Worker {

perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);

// Add labeled perfcounters
std::string labels = ceph::perf_counters::key_create(
name_prefix, {{"id", std::to_string(id)}});
PerfCountersBuilder plb_labeled(
cct, labels, l_msgr_labeled_first,
l_msgr_labeled_last);

plb_labeled.add_u64_counter(
l_msgr_connection_ready_timeouts, "msgr_connection_ready_timeouts",
"Number of not yet ready connections declared as dead", NULL,
PerfCountersBuilder::PRIO_USEFUL);
plb_labeled.add_u64_counter(
l_msgr_connection_idle_timeouts, "msgr_connection_idle_timeouts",
"Number of connections closed due to idleness", NULL,
PerfCountersBuilder::PRIO_USEFUL);

perf_labeled_logger = plb_labeled.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_labeled_logger);
}
virtual ~Worker() {
if (perf_logger) {
cct->get_perfcounters_collection()->remove(perf_logger);
delete perf_logger;
}
if (perf_labeled_logger) {
cct->get_perfcounters_collection()->remove(perf_labeled_logger);
delete perf_labeled_logger;
}
}

virtual int listen(entity_addr_t &addr, unsigned addr_slot,
Expand All @@ -275,6 +312,7 @@ class Worker {

virtual void initialize() {}
PerfCounters *get_perf_counter() { return perf_logger; }
PerfCounters *get_labeled_perf_counter() { return perf_labeled_logger; }
void release_worker() {
int oldref = references.fetch_sub(1);
ceph_assert(oldref > 0);
Expand Down
2 changes: 1 addition & 1 deletion src/pybind/mgr/dashboard/controllers/perf_counters.py
Expand Up @@ -79,4 +79,4 @@ class PerfCounters(RESTController):
@EndpointDoc("Display Perf Counters",
responses={200: PERF_SCHEMA})
def list(self):
return mgr.get_all_perf_counters()
return mgr.get_unlabeled_perf_counters()
2 changes: 1 addition & 1 deletion src/pybind/mgr/influx/module.py
Expand Up @@ -253,7 +253,7 @@ def get_pg_summary_pool(self, pool_info: Dict[str, str], now: str) -> Iterator[D
}

def get_daemon_stats(self, now: str) -> Iterator[Dict[str, Any]]:
for daemon, counters in self.get_all_perf_counters().items():
for daemon, counters in self.get_unlabeled_perf_counters().items():
svc_type, svc_id = daemon.split(".", 1)
metadata = self.get_metadata(svc_type, svc_id)
if metadata is not None:
Expand Down
2 changes: 1 addition & 1 deletion src/pybind/mgr/mgr_module.py
Expand Up @@ -2022,7 +2022,7 @@ def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tu

@API.expose
@profile_method()
def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL,
def get_unlabeled_perf_counters(self, prio_limit: int = PRIO_USEFUL,
services: Sequence[str] = ("mds", "mon", "osd",
"rbd-mirror", "rgw",
"tcmu-runner")) -> Dict[str, dict]:
Expand Down
2 changes: 1 addition & 1 deletion src/pybind/mgr/prometheus/module.py
Expand Up @@ -1652,7 +1652,7 @@ def get_perf_counters(self) -> None:
"""
Get the perf counters for all daemons
"""
for daemon, counters in self.get_all_perf_counters().items():
for daemon, counters in self.get_unlabeled_perf_counters().items():
for path, counter_info in counters.items():
# Skip histograms, they are represented by long running avgs
stattype = self._stattype_to_str(counter_info['type'])
Expand Down
2 changes: 1 addition & 1 deletion src/pybind/mgr/restful/api/perf.py
Expand Up @@ -18,7 +18,7 @@ def get(self, **kwargs):
- 'daemon' -- filter by daemon, accepts Python regexp
"""

counters = context.instance.get_all_perf_counters()
counters = context.instance.get_unlabeled_perf_counters()

if 'daemon' in kwargs:
_re = re.compile(kwargs['daemon'])
Expand Down
2 changes: 1 addition & 1 deletion src/pybind/mgr/telegraf/module.py
Expand Up @@ -72,7 +72,7 @@ def get_pool_stats(self) -> Iterable[Dict[str, Any]]:
}

def get_daemon_stats(self) -> Iterable[Dict[str, Any]]:
for daemon, counters in self.get_all_perf_counters().items():
for daemon, counters in self.get_unlabeled_perf_counters().items():
svc_type, svc_id = daemon.split('.', 1)
metadata = self.get_metadata(svc_type, svc_id)
if not metadata:
Expand Down
22 changes: 11 additions & 11 deletions src/pybind/mgr/telemetry/module.py
Expand Up @@ -794,7 +794,7 @@ def gather_crashinfo(self) -> List[Dict[str, str]]:
return crashlist

def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
# Extract perf counter data with get_all_perf_counters(), a method
# Extract perf counter data with get_unlabeled_perf_counters(), a method
# from mgr/mgr_module.py. This method returns a nested dictionary that
# looks a lot like perf schema, except with some additional fields.
#
Expand All @@ -810,7 +810,7 @@ def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
# "value": 88814109
# },
# },
all_perf_counters = self.get_all_perf_counters()
perf_counters = self.get_unlabeled_perf_counters()

# Initialize 'result' dict
result: Dict[str, dict] = defaultdict(lambda: defaultdict(
Expand All @@ -819,7 +819,7 @@ def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
# 'separated' mode
anonymized_daemon_dict = {}

for daemon, all_perf_counters_by_daemon in all_perf_counters.items():
for daemon, perf_counters_by_daemon in perf_counters.items():
daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'

if mode == 'separated':
Expand All @@ -836,7 +836,7 @@ def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
else:
result[daemon_type]['num_combined_daemons'] += 1

for collection in all_perf_counters_by_daemon:
for collection in perf_counters_by_daemon:
# Split the collection to avoid redundancy in final report; i.e.:
# bluestore.kv_flush_lat, bluestore.kv_final_lat -->
# bluestore: kv_flush_lat, kv_final_lat
Expand All @@ -856,12 +856,12 @@ def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
if mode == 'separated':
# Add value to result
result[daemon][col_0][col_1]['value'] = \
all_perf_counters_by_daemon[collection]['value']
perf_counters_by_daemon[collection]['value']

# Check that 'count' exists, as not all counters have a count field.
if 'count' in all_perf_counters_by_daemon[collection]:
if 'count' in perf_counters_by_daemon[collection]:
result[daemon][col_0][col_1]['count'] = \
all_perf_counters_by_daemon[collection]['count']
perf_counters_by_daemon[collection]['count']
elif mode == 'aggregated':
# Not every rgw daemon has the same schema. Specifically, each rgw daemon
# has a uniquely-named collection that starts off identically (i.e.
Expand All @@ -875,14 +875,14 @@ def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
# the files are of type 'pair' (real-integer-pair, integer-integer pair).
# In those cases, the value is a dictionary, and not a number.
# i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
if isinstance(all_perf_counters_by_daemon[collection]['value'], numbers.Number):
if isinstance(perf_counters_by_daemon[collection]['value'], numbers.Number):
result[daemon_type][col_0][col_1]['value'] += \
all_perf_counters_by_daemon[collection]['value']
perf_counters_by_daemon[collection]['value']

# Check that 'count' exists, as not all counters have a count field.
if 'count' in all_perf_counters_by_daemon[collection]:
if 'count' in perf_counters_by_daemon[collection]:
result[daemon_type][col_0][col_1]['count'] += \
all_perf_counters_by_daemon[collection]['count']
perf_counters_by_daemon[collection]['count']
else:
self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode))
return {}
Expand Down

0 comments on commit 4638ce2

Please sign in to comment.