Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

squid: mgr: integrate optracker in ceph-mgr #57382

Open
wants to merge 1 commit into
base: squid
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions ceph.spec.in
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,7 @@ rm -rf %{_vpath_builddir}
%if %{with lttng}
%{_libdir}/libos_tp.so*
%{_libdir}/libosd_tp.so*
%{_libdir}/libmgr_op_tp.so*
%endif
%config(noreplace) %{_sysconfdir}/logrotate.d/ceph
%if 0%{?fedora} || 0%{?rhel} || 0%{?openEuler}
Expand Down
48 changes: 48 additions & 0 deletions src/common/options/global.yaml.in
Original file line number Diff line number Diff line change
Expand Up @@ -6205,6 +6205,54 @@ options:
level: dev
desc: Time to wait during shutdown to deregister service with mgr
default: 1
- name: mgr_enable_op_tracker
type: bool
level: advanced
desc: Enable / disable MGR Op Tracker
default: true
with_legacy: true
- name: mgr_num_op_tracker_shard
type: uint
level: advanced
desc: The number of shards for holding the ops
default: 32
with_legacy: true
- name: mgr_op_complaint_time
type: float
level: advanced
default: 30
desc: An operation becomes complaint worthy after the specified number of seconds have elapsed.
with_legacy: true
- name: mgr_op_log_threshold
type: int
level: advanced
default: 5
fmt_desc: How many operations logs to display at once.
with_legacy: true
- name: mgr_op_history_size
type: uint
level: advanced
default: 20
fmt_desc: The maximum number of completed operations to track.
with_legacy: true
- name: mgr_op_history_duration
type: uint
level: advanced
default: 600
desc: The oldest completed operation to track.
with_legacy: true
- name: mgr_op_history_slow_op_size
type: uint
level: advanced
default: 20
desc: Max number of slow ops to track
with_legacy: true
- name: mgr_op_history_slow_op_threshold
type: float
level: advanced
default: 10
desc: Duration of an op to be considered as a historical slow op
with_legacy: true
- name: throttler_perf_counter
type: bool
level: advanced
Expand Down
5 changes: 5 additions & 0 deletions src/mgr/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ if(WITH_MGR)
PyOSDMap.cc
StandbyPyModules.cc
mgr_commands.cc
MgrOpRequest.cc
${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
$<TARGET_OBJECTS:mgr_cap_obj>)
add_executable(ceph-mgr ${mgr_srcs})
target_compile_definitions(ceph-mgr PRIVATE PY_SSIZE_T_CLEAN)
Expand All @@ -50,5 +52,8 @@ if(WITH_MGR)
${GSSAPI_LIBRARIES})
set_target_properties(ceph-mgr PROPERTIES
POSITION_INDEPENDENT_CODE ${EXE_LINKER_USE_PIE})
if(WITH_LTTNG)
add_dependencies(ceph-mgr mgr_op_tp)
endif()
install(TARGETS ceph-mgr DESTINATION bin)
endif()
179 changes: 170 additions & 9 deletions src/mgr/DaemonServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "mgr/DaemonHealthMetricCollector.h"
#include "mgr/OSDPerfMetricCollector.h"
#include "mgr/MDSPerfMetricCollector.h"
#include "mgr/MgrOpRequest.h"
#include "mon/MonCommand.h"

#include "messages/MMgrOpen.h"
Expand All @@ -49,6 +50,7 @@
using namespace TOPNSPC::common;

using std::list;
using std::ostream;
using std::ostringstream;
using std::string;
using std::stringstream;
Expand Down Expand Up @@ -96,22 +98,55 @@ DaemonServer::DaemonServer(MonClient *monc_,
py_modules(py_modules_),
clog(clog_),
audit_clog(audit_clog_),
asok_hook(nullptr),
pgmap_ready(false),
timer(g_ceph_context, lock),
tick_event(nullptr),
osd_perf_metric_collector_listener(this),
osd_perf_metric_collector(osd_perf_metric_collector_listener),
mds_perf_metric_collector_listener(this),
mds_perf_metric_collector(mds_perf_metric_collector_listener)
mds_perf_metric_collector(mds_perf_metric_collector_listener),
op_tracker(g_ceph_context, g_ceph_context->_conf->mgr_enable_op_tracker,
g_ceph_context->_conf->mgr_num_op_tracker_shard)
{
g_conf().add_observer(this);
/* define op size and time for mgr daemon */
op_tracker.set_complaint_and_threshold(cct->_conf->mgr_op_complaint_time,
cct->_conf->mgr_op_log_threshold);
op_tracker.set_history_size_and_duration(cct->_conf->mgr_op_history_size,
cct->_conf->mgr_op_history_duration);
op_tracker.set_history_slow_op_size_and_threshold(cct->_conf->mgr_op_history_slow_op_size,
cct->_conf->mgr_op_history_slow_op_threshold);
}

DaemonServer::~DaemonServer() {
delete msgr;
g_conf().remove_observer(this);
}

class DaemonServerHook : public AdminSocketHook {
DaemonServer *daemon_server;
public:
explicit DaemonServerHook(DaemonServer *o) : daemon_server(o) {}
int call(std::string_view admin_command,
const cmdmap_t& cmdmap,
const bufferlist&,
Formatter *f,
std::ostream& errss,
bufferlist& out) override {
stringstream outss;
int r = 0;
try {
r = daemon_server->asok_command(admin_command, cmdmap, f, outss);
out.append(outss);
} catch (const TOPNSPC::common::bad_cmd_get& e) {
errss << e.what();
r = -EINVAL;
}
return r;
}
};

int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs)
{
// Initialize Messenger
Expand Down Expand Up @@ -180,6 +215,40 @@ int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs)
schedule_tick_locked(
g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());

op_tracker.set_tracking(cct->_conf->mgr_enable_op_tracker);

AdminSocket *admin_socket = g_ceph_context->get_admin_socket();
asok_hook = new DaemonServerHook(this);
r = admin_socket->register_command("dump_ops_in_flight " \
"name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show the ops currently in flight");
ceph_assert(r == 0);
r = admin_socket->register_command("dump_blocked_ops " \
"name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show the blocked ops currently in flight");
ceph_assert(r == 0);
r = admin_socket->register_command("dump_blocked_ops_count " \
"name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show the count of blocked ops currently in flight");
ceph_assert(r == 0);
r = admin_socket->register_command("dump_historic_ops " \
"name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show recent ops");
ceph_assert(r == 0);
r = admin_socket->register_command("dump_historic_slow_ops " \
"name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show slowest recent ops");
ceph_assert(r == 0);
r = admin_socket->register_command("dump_historic_ops_by_duration " \
"name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show slowest recent ops, sorted by duration");
ceph_assert(r == 0);
return 0;
}

Expand Down Expand Up @@ -867,14 +936,21 @@ class CommandContext {
*/
class ReplyOnFinish : public Context {
std::shared_ptr<CommandContext> cmdctx;
MgrOpRequestRef op;

public:
bufferlist from_mon;
string outs;

explicit ReplyOnFinish(const std::shared_ptr<CommandContext> &cmdctx_)
: cmdctx(cmdctx_)
{}
explicit ReplyOnFinish(const std::shared_ptr<CommandContext> &cmdctx_,
MgrOpRequestRef op_)
: cmdctx(cmdctx_),
op(op_)
{
if (op) {
op->mark_finish_mon_command();
}
}
void finish(int r) override {
cmdctx->odata.claim_append(from_mon);
cmdctx->reply(r, outs);
Expand Down Expand Up @@ -1181,6 +1257,12 @@ bool DaemonServer::_handle_command(
return true;
}

// Track non-admin mgr ops only
MessageRef mref = m.get();
MgrOpRequestRef op = op_tracker.create_request<MgrOpRequest, MessageRef>(mref);

op->mark_started();

// ----------------
// service map commands
if (prefix == "service dump") {
Expand Down Expand Up @@ -1519,7 +1601,8 @@ bool DaemonServer::_handle_command(
"\"prefix\": \"osd reweightn\", "
"\"weights\": \"" + s + "\""
"}";
auto on_finish = new ReplyOnFinish(cmdctx);
op->mark_start_mon_command();
auto on_finish = new ReplyOnFinish(cmdctx, op);
monc->start_mon_command({cmd}, {},
&on_finish->from_mon, &on_finish->outs, on_finish);
return true;
Expand Down Expand Up @@ -1749,7 +1832,8 @@ bool DaemonServer::_handle_command(
"\"id\": " + stringify(osds) + ", "
"\"yes_i_really_mean_it\": true"
"}";
auto on_finish = new ReplyOnFinish(cmdctx);
op->mark_start_mon_command();
auto on_finish = new ReplyOnFinish(cmdctx, op);
monc->start_mon_command({cmd}, {}, nullptr, &on_finish->outs, on_finish);
return true;
} else if (prefix == "osd ok-to-stop") {
Expand Down Expand Up @@ -2386,7 +2470,8 @@ bool DaemonServer::_handle_command(
"\"prefix\": \"config-key set\", "
"\"key\": \"device/" + devid + "\""
"}";
auto on_finish = new ReplyOnFinish(cmdctx);
op->mark_start_mon_command();
auto on_finish = new ReplyOnFinish(cmdctx, op);
monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
}
return true;
Expand Down Expand Up @@ -2418,7 +2503,8 @@ bool DaemonServer::_handle_command(
"\"key\": \"device/" + devid + "\""
"}";
}
auto on_finish = new ReplyOnFinish(cmdctx);
op->mark_start_mon_command();
auto on_finish = new ReplyOnFinish(cmdctx, op);
monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
} else {
cmdctx->reply(0, ss);
Expand Down Expand Up @@ -2470,9 +2556,12 @@ bool DaemonServer::_handle_command(
return true;
}

op->mark_queued_for_module();

dout(10) << "passing through command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
Finisher& mod_finisher = py_modules.get_active_module_finisher(mod_name);
mod_finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix]

mod_finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix, op]
(int r_) mutable {
std::stringstream ss;

Expand Down Expand Up @@ -2525,6 +2614,7 @@ bool DaemonServer::_handle_command(

std::stringstream ds;
bufferlist inbl = cmdctx->data;
op->mark_reached(py_command.module_name.c_str());
int r = py_modules.handle_command(py_command, *session, cmdctx->cmdmap,
inbl, &ds, &ss);
if (r == -EACCES) {
Expand Down Expand Up @@ -3162,3 +3252,74 @@ int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector)
{
return mds_perf_metric_collector.get_counters(collector);
}

bool DaemonServer::asok_command(
std::string_view admin_command,
const cmdmap_t& cmdmap,
Formatter *f,
ostream& ss)
{
int ret = 0;
std::lock_guard l(lock);
if (admin_command == "dump_ops_in_flight" ||
admin_command == "dump_blocked_ops" ||
admin_command == "dump_blocked_ops_count" ||
admin_command == "dump_historic_ops" ||
admin_command == "dump_historic_ops_by_duration" ||
admin_command == "dump_historic_slow_ops") {

const string error_str = "op_tracker tracking is not enabled now, so no ops are tracked currently, \
even those get stuck. Please enable \"mgr_enable_op_tracker\", and the tracker \
will start to track new ops received afterwards.";

set<string> filters;
vector<string> filter_str;
if (cmd_getval(cmdmap, "filterstr", filter_str)) {
copy(filter_str.begin(), filter_str.end(),
inserter(filters, filters.end()));
}

if (admin_command == "dump_ops_in_flight") {
if (!op_tracker.dump_ops_in_flight(f, false, filters)) {
ss << error_str;
ret = -EINVAL;
goto out;
}
} else if (admin_command == "dump_blocked_ops") {
if (!op_tracker.dump_ops_in_flight(f, true, filters)) {
ss << error_str;
ret = -EINVAL;
goto out;
}
} else if (admin_command == "dump_blocked_ops_count") {
if (!op_tracker.dump_ops_in_flight(f, true, filters, true)) {
ss << error_str;
ret = -EINVAL;
goto out;
}
} else if (admin_command == "dump_historic_ops") {
if (!op_tracker.dump_historic_ops(f, false, filters)) {
ss << error_str;
ret = -EINVAL;
goto out;
}
} else if (admin_command == "dump_historic_ops_by_duration") {
if (!op_tracker.dump_historic_ops(f, true, filters)) {
ss << error_str;
ret = -EINVAL;
goto out;
}
} else if (admin_command == "dump_historic_slow_ops") {
if (!op_tracker.dump_historic_ops(f, true, filters)) {
ss << error_str;
ret = -EINVAL;
goto out;
}
}
}
dout(10) << "ret := " << ret << dendl;
return true;

out:
return false;
}