diff --git a/src/common/options/global.yaml.in b/src/common/options/global.yaml.in index 48c6788a88b327..ceca840a3a8f10 100644 --- a/src/common/options/global.yaml.in +++ b/src/common/options/global.yaml.in @@ -6186,6 +6186,54 @@ options: level: dev desc: Time to wait during shutdown to deregister service with mgr default: 1 +- name: mgr_enable_op_tracker + type: bool + level: advanced + desc: Enable / disable MGR Op Tracker + default: true + with_legacy: true +- name: mgr_num_op_tracker_shard + type: uint + level: advanced + desc: The number of shards for holding the ops + default: 32 + with_legacy: true +- name: mgr_op_complaint_time + type: float + level: advanced + default: 30 + desc: An operation becomes complaint worthy after the specified number of seconds have elapsed. + with_legacy: true +- name: mgr_op_log_threshold + type: int + level: advanced + default: 5 + fmt_desc: How many operations logs to display at once. + with_legacy: true +- name: mgr_op_history_size + type: uint + level: advanced + default: 20 + fmt_desc: The maximum number of completed operations to track. + with_legacy: true +- name: mgr_op_history_duration + type: uint + level: advanced + default: 600 + desc: The oldest completed operation to track. + with_legacy: true +- name: mgr_op_history_slow_op_size + type: uint + level: advanced + default: 20 + desc: Max number of slow ops to track + with_legacy: true +- name: mgr_op_history_slow_op_threshold + type: float + level: advanced + default: 10 + desc: Track the op if over this threshold age in seconds + with_legacy: true - name: throttler_perf_counter type: bool level: advanced diff --git a/src/mgr/CMakeLists.txt b/src/mgr/CMakeLists.txt index f9ec04317f4be6..4f831e152ff81f 100644 --- a/src/mgr/CMakeLists.txt +++ b/src/mgr/CMakeLists.txt @@ -32,6 +32,8 @@ if(WITH_MGR) PyOSDMap.cc StandbyPyModules.cc mgr_commands.cc + MgrOpRequest.cc + ${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc $) add_executable(ceph-mgr ${mgr_srcs}) target_compile_definitions(ceph-mgr PRIVATE PY_SSIZE_T_CLEAN) @@ -50,5 +52,8 @@ if(WITH_MGR) ${GSSAPI_LIBRARIES}) set_target_properties(ceph-mgr PROPERTIES POSITION_INDEPENDENT_CODE ${EXE_LINKER_USE_PIE}) + if(WITH_LTTNG) + add_dependencies(ceph-mgr mgr_op_tp) + endif() install(TARGETS ceph-mgr DESTINATION bin) endif() diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc index b1781316f82814..e09aa747b439b4 100644 --- a/src/mgr/DaemonServer.cc +++ b/src/mgr/DaemonServer.cc @@ -24,6 +24,7 @@ #include "mgr/DaemonHealthMetricCollector.h" #include "mgr/OSDPerfMetricCollector.h" #include "mgr/MDSPerfMetricCollector.h" +#include "mgr/MgrOpRequest.h" #include "mon/MonCommand.h" #include "messages/MMgrOpen.h" @@ -49,6 +50,7 @@ using namespace TOPNSPC::common; using std::list; +using std::ostream; using std::ostringstream; using std::string; using std::stringstream; @@ -96,15 +98,25 @@ DaemonServer::DaemonServer(MonClient *monc_, py_modules(py_modules_), clog(clog_), audit_clog(audit_clog_), + asok_hook(nullptr), pgmap_ready(false), timer(g_ceph_context, lock), tick_event(nullptr), osd_perf_metric_collector_listener(this), osd_perf_metric_collector(osd_perf_metric_collector_listener), mds_perf_metric_collector_listener(this), - mds_perf_metric_collector(mds_perf_metric_collector_listener) + mds_perf_metric_collector(mds_perf_metric_collector_listener), + op_tracker(g_ceph_context, g_ceph_context->_conf->mgr_enable_op_tracker, + g_ceph_context->_conf->mgr_num_op_tracker_shard) { g_conf().add_observer(this); + /* define op size and time for mgr daemon */ + op_tracker.set_complaint_and_threshold(cct->_conf->mgr_op_complaint_time, + cct->_conf->mgr_op_log_threshold); + op_tracker.set_history_size_and_duration(cct->_conf->mgr_op_history_size, + cct->_conf->mgr_op_history_duration); + op_tracker.set_history_slow_op_size_and_threshold(cct->_conf->mgr_op_history_slow_op_size, + cct->_conf->mgr_op_history_slow_op_threshold); } DaemonServer::~DaemonServer() { @@ -112,6 +124,29 @@ DaemonServer::~DaemonServer() { g_conf().remove_observer(this); } +class DaemonServerHook : public AdminSocketHook { + DaemonServer *daemon_server; +public: + explicit DaemonServerHook(DaemonServer *o) : daemon_server(o) {} + int call(std::string_view admin_command, + const cmdmap_t& cmdmap, + const bufferlist&, + Formatter *f, + std::ostream& errss, + bufferlist& out) override { + stringstream outss; + int r = 0; + try { + r = daemon_server->asok_command(admin_command, cmdmap, f, outss); + out.append(outss); + } catch (const TOPNSPC::common::bad_cmd_get& e) { + errss << e.what(); + r = -EINVAL; + } + return r; + } +}; + int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs) { // Initialize Messenger @@ -180,6 +215,40 @@ int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs) schedule_tick_locked( g_conf().get_val("mgr_tick_period").count()); + op_tracker.set_tracking(cct->_conf->mgr_enable_op_tracker); + + AdminSocket *admin_socket = g_ceph_context->get_admin_socket(); + asok_hook = new DaemonServerHook(this); + r = admin_socket->register_command("dump_ops_in_flight " \ + "name=filterstr,type=CephString,n=N,req=false", + asok_hook, + "show the ops currently in flight"); + ceph_assert(r == 0); + r = admin_socket->register_command("dump_blocked_ops " \ + "name=filterstr,type=CephString,n=N,req=false", + asok_hook, + "show the blocked ops currently in flight"); + ceph_assert(r == 0); + r = admin_socket->register_command("dump_blocked_ops_count " \ + "name=filterstr,type=CephString,n=N,req=false", + asok_hook, + "show the count of blocked ops currently in flight"); + ceph_assert(r == 0); + r = admin_socket->register_command("dump_historic_ops " \ + "name=filterstr,type=CephString,n=N,req=false", + asok_hook, + "show recent ops"); + ceph_assert(r == 0); + r = admin_socket->register_command("dump_historic_slow_ops " \ + "name=filterstr,type=CephString,n=N,req=false", + asok_hook, + "show slowest recent ops"); + ceph_assert(r == 0); + r = admin_socket->register_command("dump_historic_ops_by_duration " \ + "name=filterstr,type=CephString,n=N,req=false", + asok_hook, + "show slowest recent ops, sorted by duration"); + ceph_assert(r == 0); return 0; } @@ -1067,6 +1136,8 @@ bool DaemonServer::_handle_command( } else { m = cmdctx->m_mgr; } + MessageRef mref = std::move(m.get()); + MgrOpRequestRef op = op_tracker.create_request(mref); auto priv = m->get_connection()->get_priv(); auto session = static_cast(priv.get()); if (!session) { @@ -1076,6 +1147,7 @@ bool DaemonServer::_handle_command( session->inst.name = m->get_source(); } + op->mark_started(); map param_str_map; std::stringstream ss; int r = 0; @@ -2439,9 +2511,12 @@ bool DaemonServer::_handle_command( return true; } + op->mark_queued_for_module(); + dout(10) << "passing through command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl; Finisher& mod_finisher = py_modules.get_active_module_finisher(mod_name); - mod_finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix] + + mod_finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix, op] (int r_) mutable { std::stringstream ss; @@ -2494,6 +2569,7 @@ bool DaemonServer::_handle_command( std::stringstream ds; bufferlist inbl = cmdctx->data; + op->mark_reached(py_command.module_name.c_str()); int r = py_modules.handle_command(py_command, *session, cmdctx->cmdmap, inbl, &ds, &ss); if (r == -EACCES) { @@ -3131,3 +3207,68 @@ int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector) { return mds_perf_metric_collector.get_counters(collector); } + +bool DaemonServer::asok_command( + std::string_view admin_command, + const cmdmap_t& cmdmap, + Formatter *f, + ostream& ss) +{ + int ret = 0; + std::lock_guard l(lock); + if (admin_command == "dump_ops_in_flight" || + admin_command == "dump_blocked_ops" || + admin_command == "dump_blocked_ops_count" || + admin_command == "dump_historic_ops" || + admin_command == "dump_historic_ops_by_duration" || + admin_command == "dump_historic_slow_ops") { + + const string error_str = "op_tracker tracking is not enabled now, so no ops are tracked currently, \ +even those get stuck. Please enable \"osd_enable_op_tracker\", and the tracker \ +will start to track new ops received afterwards."; + + set filters; + vector filter_str; + if (cmd_getval(cmdmap, "filterstr", filter_str)) { + copy(filter_str.begin(), filter_str.end(), + inserter(filters, filters.end())); + } + + if (admin_command == "dump_ops_in_flight") { + if (!op_tracker.dump_ops_in_flight(f, false, filters)) { + ss << error_str; + ret = -EINVAL; + goto out; + } + } else if (admin_command == "dump_blocked_ops") { + if (!op_tracker.dump_ops_in_flight(f, true, filters)) { + ss << error_str; + ret = -EINVAL; + goto out; + } + } else if (admin_command == "dump_blocked_ops_count") { + if (!op_tracker.dump_ops_in_flight(f, true, filters, true)) { + ss << error_str; + ret = -EINVAL; + goto out; + } + } else if (admin_command == "dump_historic_ops") { + if (!op_tracker.dump_historic_ops(f, false, filters)) { + ss << error_str; + ret = -EINVAL; + goto out; + } + } else if (admin_command == "dump_historic_ops_by_duration") { + if (!op_tracker.dump_historic_ops(f, true, filters)) { + ss << error_str; + ret = -EINVAL; + goto out; + } + } + } + dout(10) << "ret := " << ret << dendl; + return true; + +out: + return false; +} diff --git a/src/mgr/DaemonServer.h b/src/mgr/DaemonServer.h index 43125533e745a8..998cc7c8385a89 100644 --- a/src/mgr/DaemonServer.h +++ b/src/mgr/DaemonServer.h @@ -33,6 +33,7 @@ #include "MetricCollector.h" #include "OSDPerfMetricCollector.h" #include "MDSPerfMetricCollector.h" +#include "MgrOpRequest.h" class MMgrReport; class MMgrOpen; @@ -117,7 +118,6 @@ struct offline_pg_report { } }; - /** * Server used in ceph-mgr to communicate with Ceph daemons like * MDSs and OSDs. @@ -165,6 +165,8 @@ class DaemonServer : public Dispatcher, public md_config_obs_t const std::map& param_str_map, const MonCommand *this_cmd); + class DaemonServerHook *asok_hook; + private: friend class ReplyOnFinish; bool _reply(MCommand* m, @@ -251,6 +253,9 @@ class DaemonServer : public Dispatcher, public md_config_obs_t void update_task_status(DaemonKey key, const std::map& task_status); +private: + // -- op tracking -- + OpTracker op_tracker; public: int init(uint64_t gid, entity_addrvec_t client_addrs); @@ -309,6 +314,11 @@ class DaemonServer : public Dispatcher, public md_config_obs_t void log_access_denied(std::shared_ptr& cmdctx, MgrSession* session, std::stringstream& ss); void dump_pg_ready(ceph::Formatter *f); + + bool asok_command(std::string_view admin_command, + const cmdmap_t& cmdmap, + Formatter *f, + std::ostream& ss); }; #endif diff --git a/src/mgr/Mgr.cc b/src/mgr/Mgr.cc index 5bd2ffb246c74b..309c369478007d 100644 --- a/src/mgr/Mgr.cc +++ b/src/mgr/Mgr.cc @@ -27,6 +27,7 @@ #endif #include "mgr/MgrContext.h" +#include "mgr/MgrOpRequest.h" #include "DaemonServer.h" #include "messages/MMgrDigest.h" @@ -66,7 +67,9 @@ Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap, clog(clog_), audit_clog(audit_clog_), initialized(false), - initializing(false) + initializing(false), + op_tracker(g_ceph_context, g_ceph_context->_conf->mgr_enable_op_tracker, + g_ceph_context->_conf->mgr_num_op_tracker_shard) { cluster_state.set_objecter(objecter); } diff --git a/src/mgr/Mgr.h b/src/mgr/Mgr.h index 65931c331f3638..bb403092504994 100644 --- a/src/mgr/Mgr.h +++ b/src/mgr/Mgr.h @@ -22,6 +22,7 @@ #include "msg/Messenger.h" #include "auth/Auth.h" #include "common/Finisher.h" +#include "common/TrackedOp.h" #include "mon/MgrMap.h" #include "DaemonServer.h" @@ -70,6 +71,10 @@ class Mgr : public AdminSocketHook { bool initialized; bool initializing; +private: + // -- op tracking -- + OpTracker op_tracker; + public: Mgr(MonClient *monc_, const MgrMap& mgrmap, PyModuleRegistry *py_module_registry_, diff --git a/src/mgr/MgrOpRequest.cc b/src/mgr/MgrOpRequest.cc new file mode 100644 index 00000000000000..4d1ee1ae0e7225 --- /dev/null +++ b/src/mgr/MgrOpRequest.cc @@ -0,0 +1,132 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- + +#include "MgrOpRequest.h" +#include +#include +#include "common/debug.h" +#include "common/config.h" +#include "common/Formatter.h" +#include "include/ceph_assert.h" +#include "msg/Message.h" + +#ifdef WITH_LTTNG +#define TRACEPOINT_DEFINE +#define TRACEPOINT_PROBE_DYNAMIC_LINKAGE +#include "tracing/mgroprequest.h" +#undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE +#undef TRACEPOINT_DEFINE +#else +#define tracepoint(...) +#endif + +using std::ostream; +using std::set; +using std::string; +using std::stringstream; + +using ceph::Formatter; + +MgrOpRequest::MgrOpRequest(MessageRef req, OpTracker* tracker) + : TrackedOp(tracker, req->get_recv_stamp()), + request(req) { + req_src_inst = req->get_source_inst(); +} + +void MgrOpRequest::_dump(Formatter *f) const +{ + MessageRef m = request; + f->dump_string("flag_point", state_string()); + if (m->get_orig_source().is_client()) { + f->open_object_section("client_info"); + stringstream client_name, client_addr; + client_name << req_src_inst.name; + client_addr << req_src_inst.addr; + f->dump_string("client", client_name.str()); + f->dump_string("client_addr", client_addr.str()); + f->dump_unsigned("tid", m->get_tid()); + f->close_section(); // client_info + } + + { + f->open_array_section("events"); + std::lock_guard l(lock); + + for (auto i = events.begin(); i != events.end(); ++i) { + f->open_object_section("event"); + f->dump_string("event", i->str); + f->dump_stream("time") << i->stamp; + + double duration = 0; + + if (i != events.begin()) { + auto i_prev = i - 1; + duration = i->stamp - i_prev->stamp; + } + + f->dump_float("duration", duration); + f->close_section(); + } + f->close_section(); + } +} + +void MgrOpRequest::_dump_op_descriptor(ostream& stream) const +{ + get_req()->print(stream); +} + +void MgrOpRequest::_unregistered() { + request->clear_data(); + request->clear_payload(); + request->release_message_throttle(); + request->set_connection(nullptr); +} + +void MgrOpRequest::mark_flag_point(uint8_t flag, const char *s) { + [[maybe_unused]] uint8_t old_flags = hit_flag_points; + mark_event(s); + last_event_detail = s; + hit_flag_points |= flag; + latest_flag_point = flag; + + tracepoint(mgroprequest, mark_flag_point, + flag, s, old_flags, hit_flag_points); +} + +void MgrOpRequest::mark_flag_point_string(uint8_t flag, const string& s) { + [[maybe_unused]] uint8_t old_flags = hit_flag_points; + mark_event(s); + hit_flag_points |= flag; + latest_flag_point = flag; + + tracepoint(mgroprequest, mark_flag_point, + flag, s.c_str(), old_flags, hit_flag_points); +} + +bool MgrOpRequest::filter_out(const set& filters) +{ + set addrs; + for (const auto& filter : filters) { + entity_addr_t addr; + if (addr.parse(filter.c_str())) { + addrs.insert(addr); + } + } + if (addrs.empty()) + return true; + + entity_addr_t cmp_addr = req_src_inst.addr; + if (addrs.count(cmp_addr)) { + return true; + } + cmp_addr.set_nonce(0); + if (addrs.count(cmp_addr)) { + return true; + } + cmp_addr.set_port(0); + if (addrs.count(cmp_addr)) { + return true; + } + + return false; +} diff --git a/src/mgr/MgrOpRequest.h b/src/mgr/MgrOpRequest.h new file mode 100644 index 00000000000000..f71662078a5ba8 --- /dev/null +++ b/src/mgr/MgrOpRequest.h @@ -0,0 +1,115 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef OPREQUEST_H_ +#define OPREQUEST_H_ + +#include "common/TrackedOp.h" +#include "common/tracer.h" +/** + * The MgrOpRequest takes in a MessageRef and takes over a single reference + * to it, which it puts() when destroyed. + */ +struct MgrOpRequest : public TrackedOp { + friend class OpTracker; + +public: + void _dump(ceph::Formatter *f) const override; + +private: + MessageRef request; /// the logical request we are tracking + entity_inst_t req_src_inst; + uint8_t hit_flag_points; + uint8_t latest_flag_point; + const char* last_event_detail = nullptr; + + static const uint8_t flag_started = 1 << 0; + static const uint8_t flag_queued_for_module = 1 << 1; + static const uint8_t flag_reached_module = 1 << 2; + + MgrOpRequest(MessageRef req, OpTracker *tracker); + +protected: + void _dump_op_descriptor(std::ostream& stream) const override; + void _unregistered() override; + bool filter_out(const std::set& filters) override; + +public: + ~MgrOpRequest() override { + request->put(); + } + + template + const T* get_req() const { return static_cast(request); } + + const MessageRef get_req() const { return request; } + MessageRef get_nonconst_req() { return request; } + + entity_name_t get_source() { + if (request) { + return request->get_source(); + } else { + return {}; + } + } + uint8_t state_flag() const { + return latest_flag_point; + } + + std::string _get_state_string() const override { + switch(latest_flag_point) { + case flag_started: return "started"; + case flag_queued_for_module: return "queued for module"; + case flag_reached_module: return last_event_detail; + default: break; + } + return "no flag points reached"; + } + + static std::string get_state_string(uint8_t flag) { + std::string flag_point; + switch(flag) { + case flag_started: + flag_point = "started"; + break; + case flag_queued_for_module: + flag_point = "queued for module"; + break; + case flag_reached_module: + flag_point = "reached module"; + break; + } + return flag_point; + } + + void mark_started() { + mark_flag_point(flag_started, "started"); + } + void mark_queued_for_module() { + mark_flag_point(flag_queued_for_module, "queued_for_module"); + } + void mark_reached(const char *s) { + mark_flag_point(flag_reached_module, s); + } + + typedef boost::intrusive_ptr Ref; + +private: + void mark_flag_point(uint8_t flag, const char *s); + void mark_flag_point_string(uint8_t flag, const std::string& s); +}; + +typedef MgrOpRequest::Ref MgrOpRequestRef; + +#endif /* OPREQUEST_H_ */ diff --git a/src/tracing/CMakeLists.txt b/src/tracing/CMakeLists.txt index 0044299a44bbac..991640bb6cf48a 100644 --- a/src/tracing/CMakeLists.txt +++ b/src/tracing/CMakeLists.txt @@ -48,8 +48,9 @@ add_tracing_library(os_tp objectstore.tp 1.0.0) add_tracing_library(bluestore_tp bluestore.tp 1.0.0) add_tracing_library(rgw_op_tp rgw_op.tp 2.0.0) add_tracing_library(rgw_rados_tp rgw_rados.tp 2.0.0) +add_tracing_library(mgr_op_tp mgroprequest.tp 1.0.0) -install(TARGETS rados_tp osd_tp os_tp rgw_rados_tp rgw_op_tp DESTINATION ${CMAKE_INSTALL_LIBDIR}) +install(TARGETS rados_tp osd_tp os_tp rgw_rados_tp rgw_op_tp mgr_op_tp DESTINATION ${CMAKE_INSTALL_LIBDIR}) if(WITH_RBD) add_tracing_library(rbd_tp librbd.tp 1.0.0) install(TARGETS rbd_tp DESTINATION ${CMAKE_INSTALL_LIBDIR}) diff --git a/src/tracing/mgroprequest.c b/src/tracing/mgroprequest.c new file mode 100644 index 00000000000000..d56c8e5ba21790 --- /dev/null +++ b/src/tracing/mgroprequest.c @@ -0,0 +1,5 @@ +#define TRACEPOINT_CREATE_PROBES +/* + * The header containing our TRACEPOINT_EVENTs. + */ +#include "tracing/mgroprequest.h" diff --git a/src/tracing/mgroprequest.tp b/src/tracing/mgroprequest.tp new file mode 100644 index 00000000000000..06d43f37698494 --- /dev/null +++ b/src/tracing/mgroprequest.tp @@ -0,0 +1,27 @@ +#include "include/int_types.h" + +TRACEPOINT_EVENT(mgroprequest, set_rmw_flags, + TP_ARGS( + int, flag, + int, old_rmw_flags, + int, new_rmw_flags), + TP_FIELDS( + ctf_integer_hex(int, flag, flag) + ctf_integer_hex(int, old_rmw_flags, old_rmw_flags) + ctf_integer_hex(int, new_rmw_flags, new_rmw_flags) + ) +) + +TRACEPOINT_EVENT(mgroprequest, mark_flag_point, + TP_ARGS( + uint8_t, flag, + const char*, msg, + uint8_t, old_hit_flag_points, + uint8_t, new_hit_flag_points), + TP_FIELDS( + ctf_integer_hex(uint8_t, flag, flag) + ctf_string(msg, msg) + ctf_integer_hex(uint8_t, old_hit_flag_points, old_hit_flag_points) + ctf_integer_hex(uint8_t, new_hit_flag_points, new_hit_flag_points) + ) +)