Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reef: msgr: AsyncMessenger add faulted connections metrics #53033

Merged
merged 2 commits into from Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 21 additions & 14 deletions src/mgr/MgrClient.cc
Expand Up @@ -14,6 +14,7 @@

#include "MgrClient.h"

#include "common/perf_counters_key.h"
#include "mgr/MgrContext.h"
#include "mon/MonMap.h"

Expand Down Expand Up @@ -331,6 +332,12 @@ void MgrClient::_send_report()
const PerfCounters::perf_counter_data_any_d &ctr,
const PerfCounters &perf_counters)
{
// FIXME: We don't send labeled perf counters to the mgr currently.
auto labels = ceph::perf_counters::key_labels(perf_counters.get_name());
if (labels.begin() != labels.end()) {
return false;
}

return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold;
};

Expand Down Expand Up @@ -367,20 +374,20 @@ void MgrClient::_send_report()
}

if (session->declared.count(path) == 0) {
ldout(cct,20) << " declare " << path << dendl;
PerfCounterType type;
type.path = path;
if (data.description) {
type.description = data.description;
}
if (data.nick) {
type.nick = data.nick;
}
type.type = data.type;
type.priority = perf_counters.get_adjusted_priority(data.prio);
type.unit = data.unit;
report->declare_types.push_back(std::move(type));
session->declared.insert(path);
ldout(cct, 20) << " declare " << path << dendl;
PerfCounterType type;
type.path = path;
if (data.description) {
type.description = data.description;
}
if (data.nick) {
type.nick = data.nick;
}
type.type = data.type;
type.priority = perf_counters.get_adjusted_priority(data.prio);
type.unit = data.unit;
report->declare_types.push_back(std::move(type));
session->declared.insert(path);
}

encode(static_cast<uint64_t>(data.u64), report->packed);
Expand Down
3 changes: 3 additions & 0 deletions src/msg/async/AsyncConnection.cc
Expand Up @@ -116,6 +116,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
: Connection(cct, m),
delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
logger(w->get_perf_counter()),
labeled_logger(w->get_labeled_perf_counter()),
state(STATE_NONE), port(-1),
dispatch_queue(q), recv_buf(NULL),
recv_max_prefetch(std::max<int64_t>(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
Expand Down Expand Up @@ -791,6 +792,7 @@ void AsyncConnection::tick(uint64_t id)
<< target_addr << ", fault."
<< dendl;
protocol->fault();
labeled_logger->inc(l_msgr_connection_ready_timeouts);
} else {
last_tick_id = center->create_time_event(connect_timeout_us, tick_handler);
}
Expand All @@ -803,6 +805,7 @@ void AsyncConnection::tick(uint64_t id)
<< " us, fault."
<< dendl;
protocol->fault();
labeled_logger->inc(l_msgr_connection_idle_timeouts);
} else {
last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
}
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/AsyncConnection.h
Expand Up @@ -173,6 +173,7 @@ class AsyncConnection : public Connection {
AsyncMessenger *async_msgr;
uint64_t conn_id;
PerfCounters *logger;
PerfCounters *labeled_logger;
int state;
ConnectedSocket cs;
int port;
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/ProtocolV1.cc
Expand Up @@ -2414,6 +2414,7 @@ CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing,
existing->worker->references--;
new_worker->references++;
existing->logger = new_worker->get_perf_counter();
existing->labeled_logger = new_worker->get_labeled_perf_counter();
existing->worker = new_worker;
existing->center = new_center;
if (existing->delay_state)
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/ProtocolV2.cc
Expand Up @@ -2808,6 +2808,7 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
existing->worker->references--;
new_worker->references++;
existing->logger = new_worker->get_perf_counter();
existing->labeled_logger = new_worker->get_labeled_perf_counter();
existing->worker = new_worker;
existing->center = new_center;
if (existing->delay_state)
Expand Down
46 changes: 42 additions & 4 deletions src/msg/async/Stack.h
Expand Up @@ -17,10 +17,12 @@
#ifndef CEPH_MSG_ASYNC_STACK_H
#define CEPH_MSG_ASYNC_STACK_H

#include "include/spinlock.h"
#include "common/perf_counters.h"
#include "msg/msg_types.h"
#include "common/perf_counters_key.h"
#include "include/spinlock.h"
#include "msg/async/Event.h"
#include "msg/msg_types.h"
#include <string>

class Worker;
class ConnectedSocketImpl {
Expand Down Expand Up @@ -214,6 +216,15 @@ enum {
l_msgr_last,
};

enum {
l_msgr_labeled_first = l_msgr_last + 1,

l_msgr_connection_ready_timeouts,
l_msgr_connection_idle_timeouts,

l_msgr_labeled_last,
};

class Worker {
std::mutex init_lock;
std::condition_variable init_cond;
Expand All @@ -224,6 +235,7 @@ class Worker {

CephContext *cct;
PerfCounters *perf_logger;
PerfCounters *perf_labeled_logger;
unsigned id;

std::atomic_uint references;
Expand All @@ -233,9 +245,11 @@ class Worker {
Worker& operator=(const Worker&) = delete;

Worker(CephContext *c, unsigned worker_id)
: cct(c), perf_logger(NULL), id(worker_id), references(0), center(c) {
: cct(c), id(worker_id), references(0), center(c) {
char name[128];
sprintf(name, "AsyncMessenger::Worker-%u", id);
char name_prefix[] = "AsyncMessenger::Worker";
sprintf(name, "%s-%u", name_prefix, id);

// initialize perf_logger
PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);

Expand All @@ -259,12 +273,35 @@ class Worker {

perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);

// Add labeled perfcounters
std::string labels = ceph::perf_counters::key_create(
name_prefix, {{"id", std::to_string(id)}});
PerfCountersBuilder plb_labeled(
cct, labels, l_msgr_labeled_first,
l_msgr_labeled_last);

plb_labeled.add_u64_counter(
l_msgr_connection_ready_timeouts, "msgr_connection_ready_timeouts",
"Number of not yet ready connections declared as dead", NULL,
PerfCountersBuilder::PRIO_USEFUL);
plb_labeled.add_u64_counter(
l_msgr_connection_idle_timeouts, "msgr_connection_idle_timeouts",
"Number of connections closed due to idleness", NULL,
PerfCountersBuilder::PRIO_USEFUL);

perf_labeled_logger = plb_labeled.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_labeled_logger);
}
virtual ~Worker() {
if (perf_logger) {
cct->get_perfcounters_collection()->remove(perf_logger);
delete perf_logger;
}
if (perf_labeled_logger) {
cct->get_perfcounters_collection()->remove(perf_labeled_logger);
delete perf_labeled_logger;
}
}

virtual int listen(entity_addr_t &addr, unsigned addr_slot,
Expand All @@ -275,6 +312,7 @@ class Worker {

virtual void initialize() {}
PerfCounters *get_perf_counter() { return perf_logger; }
PerfCounters *get_labeled_perf_counter() { return perf_labeled_logger; }
void release_worker() {
int oldref = references.fetch_sub(1);
ceph_assert(oldref > 0);
Expand Down
2 changes: 1 addition & 1 deletion src/pybind/mgr/dashboard/controllers/perf_counters.py
Expand Up @@ -79,4 +79,4 @@ class PerfCounters(RESTController):
@EndpointDoc("Display Perf Counters",
responses={200: PERF_SCHEMA})
def list(self):
return mgr.get_all_perf_counters()
return mgr.get_unlabeled_perf_counters()
2 changes: 1 addition & 1 deletion src/pybind/mgr/influx/module.py
Expand Up @@ -253,7 +253,7 @@ def get_pg_summary_pool(self, pool_info: Dict[str, str], now: str) -> Iterator[D
}

def get_daemon_stats(self, now: str) -> Iterator[Dict[str, Any]]:
for daemon, counters in self.get_all_perf_counters().items():
for daemon, counters in self.get_unlabeled_perf_counters().items():
svc_type, svc_id = daemon.split(".", 1)
metadata = self.get_metadata(svc_type, svc_id)
if metadata is not None:
Expand Down
2 changes: 1 addition & 1 deletion src/pybind/mgr/mgr_module.py
Expand Up @@ -2022,7 +2022,7 @@ def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tu

@API.expose
@profile_method()
def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL,
def get_unlabeled_perf_counters(self, prio_limit: int = PRIO_USEFUL,
services: Sequence[str] = ("mds", "mon", "osd",
"rbd-mirror", "rgw",
"tcmu-runner")) -> Dict[str, dict]:
Expand Down
2 changes: 1 addition & 1 deletion src/pybind/mgr/prometheus/module.py
Expand Up @@ -1652,7 +1652,7 @@ def get_perf_counters(self) -> None:
"""
Get the perf counters for all daemons
"""
for daemon, counters in self.get_all_perf_counters().items():
for daemon, counters in self.get_unlabeled_perf_counters().items():
for path, counter_info in counters.items():
# Skip histograms, they are represented by long running avgs
stattype = self._stattype_to_str(counter_info['type'])
Expand Down
2 changes: 1 addition & 1 deletion src/pybind/mgr/restful/api/perf.py
Expand Up @@ -18,7 +18,7 @@ def get(self, **kwargs):
- 'daemon' -- filter by daemon, accepts Python regexp
"""

counters = context.instance.get_all_perf_counters()
counters = context.instance.get_unlabeled_perf_counters()

if 'daemon' in kwargs:
_re = re.compile(kwargs['daemon'])
Expand Down
2 changes: 1 addition & 1 deletion src/pybind/mgr/telegraf/module.py
Expand Up @@ -72,7 +72,7 @@ def get_pool_stats(self) -> Iterable[Dict[str, Any]]:
}

def get_daemon_stats(self) -> Iterable[Dict[str, Any]]:
for daemon, counters in self.get_all_perf_counters().items():
for daemon, counters in self.get_unlabeled_perf_counters().items():
svc_type, svc_id = daemon.split('.', 1)
metadata = self.get_metadata(svc_type, svc_id)
if not metadata:
Expand Down
22 changes: 11 additions & 11 deletions src/pybind/mgr/telemetry/module.py
Expand Up @@ -794,7 +794,7 @@ def gather_crashinfo(self) -> List[Dict[str, str]]:
return crashlist

def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
# Extract perf counter data with get_all_perf_counters(), a method
# Extract perf counter data with get_unlabeled_perf_counters(), a method
# from mgr/mgr_module.py. This method returns a nested dictionary that
# looks a lot like perf schema, except with some additional fields.
#
Expand All @@ -810,7 +810,7 @@ def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
# "value": 88814109
# },
# },
all_perf_counters = self.get_all_perf_counters()
perf_counters = self.get_unlabeled_perf_counters()

# Initialize 'result' dict
result: Dict[str, dict] = defaultdict(lambda: defaultdict(
Expand All @@ -819,7 +819,7 @@ def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
# 'separated' mode
anonymized_daemon_dict = {}

for daemon, all_perf_counters_by_daemon in all_perf_counters.items():
for daemon, perf_counters_by_daemon in perf_counters.items():
daemon_type = daemon[0:3] # i.e. 'mds', 'osd', 'rgw'

if mode == 'separated':
Expand All @@ -836,7 +836,7 @@ def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
else:
result[daemon_type]['num_combined_daemons'] += 1

for collection in all_perf_counters_by_daemon:
for collection in perf_counters_by_daemon:
# Split the collection to avoid redundancy in final report; i.e.:
# bluestore.kv_flush_lat, bluestore.kv_final_lat -->
# bluestore: kv_flush_lat, kv_final_lat
Expand All @@ -856,12 +856,12 @@ def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
if mode == 'separated':
# Add value to result
result[daemon][col_0][col_1]['value'] = \
all_perf_counters_by_daemon[collection]['value']
perf_counters_by_daemon[collection]['value']

# Check that 'count' exists, as not all counters have a count field.
if 'count' in all_perf_counters_by_daemon[collection]:
if 'count' in perf_counters_by_daemon[collection]:
result[daemon][col_0][col_1]['count'] = \
all_perf_counters_by_daemon[collection]['count']
perf_counters_by_daemon[collection]['count']
elif mode == 'aggregated':
# Not every rgw daemon has the same schema. Specifically, each rgw daemon
# has a uniquely-named collection that starts off identically (i.e.
Expand All @@ -875,14 +875,14 @@ def gather_perf_counters(self, mode: str = 'separated') -> Dict[str, dict]:
# the files are of type 'pair' (real-integer-pair, integer-integer pair).
# In those cases, the value is a dictionary, and not a number.
# i.e. throttle-msgr_dispatch_throttler-hbserver["wait"]
if isinstance(all_perf_counters_by_daemon[collection]['value'], numbers.Number):
if isinstance(perf_counters_by_daemon[collection]['value'], numbers.Number):
result[daemon_type][col_0][col_1]['value'] += \
all_perf_counters_by_daemon[collection]['value']
perf_counters_by_daemon[collection]['value']

# Check that 'count' exists, as not all counters have a count field.
if 'count' in all_perf_counters_by_daemon[collection]:
if 'count' in perf_counters_by_daemon[collection]:
result[daemon_type][col_0][col_1]['count'] += \
all_perf_counters_by_daemon[collection]['count']
perf_counters_by_daemon[collection]['count']
else:
self.log.error('Incorrect mode specified in gather_perf_counters: {}'.format(mode))
return {}
Expand Down