Skip to content

Commit

Permalink
mgr: integrate optracker in ceph-mgr
Browse files Browse the repository at this point in the history
Signed-off-by: Prashant D <pdhange@redhat.com>
Signed-off-by: Michael J. Kidd <linuxkidd@gmail.com>
  • Loading branch information
Prashant D authored and linuxkidd committed Jan 29, 2024
1 parent c6b0b21 commit fc03f65
Show file tree
Hide file tree
Showing 11 changed files with 497 additions and 5 deletions.
48 changes: 48 additions & 0 deletions src/common/options/global.yaml.in
Expand Up @@ -6186,6 +6186,54 @@ options:
level: dev
desc: Time to wait during shutdown to deregister service with mgr
default: 1
- name: mgr_enable_op_tracker
type: bool
level: advanced
desc: Enable / disable MGR Op Tracker
default: true
with_legacy: true
- name: mgr_num_op_tracker_shard
type: uint
level: advanced
desc: The number of shards for holding the ops
default: 32
with_legacy: true
- name: mgr_op_complaint_time
type: float
level: advanced
default: 30
desc: An operation becomes complaint worthy after the specified number of seconds have elapsed.
with_legacy: true
- name: mgr_op_log_threshold
type: int
level: advanced
default: 5
fmt_desc: How many operations logs to display at once.
with_legacy: true
- name: mgr_op_history_size
type: uint
level: advanced
default: 20
fmt_desc: The maximum number of completed operations to track.
with_legacy: true
- name: mgr_op_history_duration
type: uint
level: advanced
default: 600
desc: The oldest completed operation to track.
with_legacy: true
- name: mgr_op_history_slow_op_size
type: uint
level: advanced
default: 20
desc: Max number of slow ops to track
with_legacy: true
- name: mgr_op_history_slow_op_threshold
type: float
level: advanced
default: 10
desc: Track the op if over this threshold age in seconds
with_legacy: true
- name: throttler_perf_counter
type: bool
level: advanced
Expand Down
5 changes: 5 additions & 0 deletions src/mgr/CMakeLists.txt
Expand Up @@ -32,6 +32,8 @@ if(WITH_MGR)
PyOSDMap.cc
StandbyPyModules.cc
mgr_commands.cc
MgrOpRequest.cc
${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
$<TARGET_OBJECTS:mgr_cap_obj>)
add_executable(ceph-mgr ${mgr_srcs})
target_compile_definitions(ceph-mgr PRIVATE PY_SSIZE_T_CLEAN)
Expand All @@ -50,5 +52,8 @@ if(WITH_MGR)
${GSSAPI_LIBRARIES})
set_target_properties(ceph-mgr PROPERTIES
POSITION_INDEPENDENT_CODE ${EXE_LINKER_USE_PIE})
if(WITH_LTTNG)
add_dependencies(ceph-mgr mgr_op_tp)
endif()
install(TARGETS ceph-mgr DESTINATION bin)
endif()
145 changes: 143 additions & 2 deletions src/mgr/DaemonServer.cc
Expand Up @@ -24,6 +24,7 @@
#include "mgr/DaemonHealthMetricCollector.h"
#include "mgr/OSDPerfMetricCollector.h"
#include "mgr/MDSPerfMetricCollector.h"
#include "mgr/MgrOpRequest.h"
#include "mon/MonCommand.h"

#include "messages/MMgrOpen.h"
Expand All @@ -49,6 +50,7 @@
using namespace TOPNSPC::common;

using std::list;
using std::ostream;
using std::ostringstream;
using std::string;
using std::stringstream;
Expand Down Expand Up @@ -96,22 +98,55 @@ DaemonServer::DaemonServer(MonClient *monc_,
py_modules(py_modules_),
clog(clog_),
audit_clog(audit_clog_),
asok_hook(nullptr),
pgmap_ready(false),
timer(g_ceph_context, lock),
tick_event(nullptr),
osd_perf_metric_collector_listener(this),
osd_perf_metric_collector(osd_perf_metric_collector_listener),
mds_perf_metric_collector_listener(this),
mds_perf_metric_collector(mds_perf_metric_collector_listener)
mds_perf_metric_collector(mds_perf_metric_collector_listener),
op_tracker(g_ceph_context, g_ceph_context->_conf->mgr_enable_op_tracker,
g_ceph_context->_conf->mgr_num_op_tracker_shard)
{
g_conf().add_observer(this);
/* define op size and time for mgr daemon */
op_tracker.set_complaint_and_threshold(cct->_conf->mgr_op_complaint_time,
cct->_conf->mgr_op_log_threshold);
op_tracker.set_history_size_and_duration(cct->_conf->mgr_op_history_size,
cct->_conf->mgr_op_history_duration);
op_tracker.set_history_slow_op_size_and_threshold(cct->_conf->mgr_op_history_slow_op_size,
cct->_conf->mgr_op_history_slow_op_threshold);
}

DaemonServer::~DaemonServer() {
delete msgr;
g_conf().remove_observer(this);
}

class DaemonServerHook : public AdminSocketHook {
DaemonServer *daemon_server;
public:
explicit DaemonServerHook(DaemonServer *o) : daemon_server(o) {}
int call(std::string_view admin_command,
const cmdmap_t& cmdmap,
const bufferlist&,
Formatter *f,
std::ostream& errss,
bufferlist& out) override {
stringstream outss;
int r = 0;
try {
r = daemon_server->asok_command(admin_command, cmdmap, f, outss);
out.append(outss);
} catch (const TOPNSPC::common::bad_cmd_get& e) {
errss << e.what();
r = -EINVAL;
}
return r;
}
};

int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs)
{
// Initialize Messenger
Expand Down Expand Up @@ -180,6 +215,40 @@ int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs)
schedule_tick_locked(
g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());

op_tracker.set_tracking(cct->_conf->mgr_enable_op_tracker);

AdminSocket *admin_socket = g_ceph_context->get_admin_socket();
asok_hook = new DaemonServerHook(this);
r = admin_socket->register_command("dump_ops_in_flight " \
"name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show the ops currently in flight");
ceph_assert(r == 0);
r = admin_socket->register_command("dump_blocked_ops " \
"name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show the blocked ops currently in flight");
ceph_assert(r == 0);
r = admin_socket->register_command("dump_blocked_ops_count " \
"name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show the count of blocked ops currently in flight");
ceph_assert(r == 0);
r = admin_socket->register_command("dump_historic_ops " \
"name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show recent ops");
ceph_assert(r == 0);
r = admin_socket->register_command("dump_historic_slow_ops " \
"name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show slowest recent ops");
ceph_assert(r == 0);
r = admin_socket->register_command("dump_historic_ops_by_duration " \
"name=filterstr,type=CephString,n=N,req=false",
asok_hook,
"show slowest recent ops, sorted by duration");
ceph_assert(r == 0);
return 0;
}

Expand Down Expand Up @@ -1067,6 +1136,8 @@ bool DaemonServer::_handle_command(
} else {
m = cmdctx->m_mgr;
}
MessageRef mref = std::move(m.get());
MgrOpRequestRef op = op_tracker.create_request<MgrOpRequest, MessageRef>(mref);
auto priv = m->get_connection()->get_priv();
auto session = static_cast<MgrSession*>(priv.get());
if (!session) {
Expand All @@ -1076,6 +1147,7 @@ bool DaemonServer::_handle_command(
session->inst.name = m->get_source();
}

op->mark_started();
map<string,string> param_str_map;
std::stringstream ss;
int r = 0;
Expand Down Expand Up @@ -2439,9 +2511,12 @@ bool DaemonServer::_handle_command(
return true;
}

op->mark_queued_for_module();

dout(10) << "passing through command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
Finisher& mod_finisher = py_modules.get_active_module_finisher(mod_name);
mod_finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix]

mod_finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix, op]
(int r_) mutable {
std::stringstream ss;

Expand Down Expand Up @@ -2494,6 +2569,7 @@ bool DaemonServer::_handle_command(

std::stringstream ds;
bufferlist inbl = cmdctx->data;
op->mark_reached(py_command.module_name.c_str());
int r = py_modules.handle_command(py_command, *session, cmdctx->cmdmap,
inbl, &ds, &ss);
if (r == -EACCES) {
Expand Down Expand Up @@ -3131,3 +3207,68 @@ int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector)
{
return mds_perf_metric_collector.get_counters(collector);
}

bool DaemonServer::asok_command(
std::string_view admin_command,
const cmdmap_t& cmdmap,
Formatter *f,
ostream& ss)
{
int ret = 0;
std::lock_guard l(lock);
if (admin_command == "dump_ops_in_flight" ||
admin_command == "dump_blocked_ops" ||
admin_command == "dump_blocked_ops_count" ||
admin_command == "dump_historic_ops" ||
admin_command == "dump_historic_ops_by_duration" ||
admin_command == "dump_historic_slow_ops") {

const string error_str = "op_tracker tracking is not enabled now, so no ops are tracked currently, \
even those get stuck. Please enable \"osd_enable_op_tracker\", and the tracker \
will start to track new ops received afterwards.";

set<string> filters;
vector<string> filter_str;
if (cmd_getval(cmdmap, "filterstr", filter_str)) {
copy(filter_str.begin(), filter_str.end(),
inserter(filters, filters.end()));
}

if (admin_command == "dump_ops_in_flight") {
if (!op_tracker.dump_ops_in_flight(f, false, filters)) {
ss << error_str;
ret = -EINVAL;
goto out;
}
} else if (admin_command == "dump_blocked_ops") {
if (!op_tracker.dump_ops_in_flight(f, true, filters)) {
ss << error_str;
ret = -EINVAL;
goto out;
}
} else if (admin_command == "dump_blocked_ops_count") {
if (!op_tracker.dump_ops_in_flight(f, true, filters, true)) {
ss << error_str;
ret = -EINVAL;
goto out;
}
} else if (admin_command == "dump_historic_ops") {
if (!op_tracker.dump_historic_ops(f, false, filters)) {
ss << error_str;
ret = -EINVAL;
goto out;
}
} else if (admin_command == "dump_historic_ops_by_duration") {
if (!op_tracker.dump_historic_ops(f, true, filters)) {
ss << error_str;
ret = -EINVAL;
goto out;
}
}
}
dout(10) << "ret := " << ret << dendl;
return true;

out:
return false;
}
12 changes: 11 additions & 1 deletion src/mgr/DaemonServer.h
Expand Up @@ -33,6 +33,7 @@
#include "MetricCollector.h"
#include "OSDPerfMetricCollector.h"
#include "MDSPerfMetricCollector.h"
#include "MgrOpRequest.h"

class MMgrReport;
class MMgrOpen;
Expand Down Expand Up @@ -117,7 +118,6 @@ struct offline_pg_report {
}
};


/**
* Server used in ceph-mgr to communicate with Ceph daemons like
* MDSs and OSDs.
Expand Down Expand Up @@ -165,6 +165,8 @@ class DaemonServer : public Dispatcher, public md_config_obs_t
const std::map<std::string,std::string>& param_str_map,
const MonCommand *this_cmd);

class DaemonServerHook *asok_hook;

private:
friend class ReplyOnFinish;
bool _reply(MCommand* m,
Expand Down Expand Up @@ -251,6 +253,9 @@ class DaemonServer : public Dispatcher, public md_config_obs_t

void update_task_status(DaemonKey key,
const std::map<std::string,std::string>& task_status);
private:
// -- op tracking --
OpTracker op_tracker;

public:
int init(uint64_t gid, entity_addrvec_t client_addrs);
Expand Down Expand Up @@ -309,6 +314,11 @@ class DaemonServer : public Dispatcher, public md_config_obs_t
void log_access_denied(std::shared_ptr<CommandContext>& cmdctx,
MgrSession* session, std::stringstream& ss);
void dump_pg_ready(ceph::Formatter *f);

bool asok_command(std::string_view admin_command,
const cmdmap_t& cmdmap,
Formatter *f,
std::ostream& ss);
};

#endif
Expand Down
5 changes: 4 additions & 1 deletion src/mgr/Mgr.cc
Expand Up @@ -27,6 +27,7 @@
#endif

#include "mgr/MgrContext.h"
#include "mgr/MgrOpRequest.h"

#include "DaemonServer.h"
#include "messages/MMgrDigest.h"
Expand Down Expand Up @@ -66,7 +67,9 @@ Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
clog(clog_),
audit_clog(audit_clog_),
initialized(false),
initializing(false)
initializing(false),
op_tracker(g_ceph_context, g_ceph_context->_conf->mgr_enable_op_tracker,
g_ceph_context->_conf->mgr_num_op_tracker_shard)
{
cluster_state.set_objecter(objecter);
}
Expand Down
5 changes: 5 additions & 0 deletions src/mgr/Mgr.h
Expand Up @@ -22,6 +22,7 @@
#include "msg/Messenger.h"
#include "auth/Auth.h"
#include "common/Finisher.h"
#include "common/TrackedOp.h"
#include "mon/MgrMap.h"

#include "DaemonServer.h"
Expand Down Expand Up @@ -70,6 +71,10 @@ class Mgr : public AdminSocketHook {
bool initialized;
bool initializing;

private:
// -- op tracking --
OpTracker op_tracker;

public:
Mgr(MonClient *monc_, const MgrMap& mgrmap,
PyModuleRegistry *py_module_registry_,
Expand Down

0 comments on commit fc03f65

Please sign in to comment.