Skip to content

Commit

Permalink
refactor(remote_commands): init
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Feb 1, 2024
1 parent 7d581a4 commit 3cb5420
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 61 deletions.
54 changes: 29 additions & 25 deletions src/runtime/service_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/join_point.h"
#include "utils/string_conv.h"
#include "utils/strings.h"

using namespace dsn::utils;
Expand Down Expand Up @@ -144,14 +145,11 @@ error_code service_node::start()
return err;
}

void service_node::get_runtime_info(const std::string &indent,
const std::vector<std::string> &args,
/*out*/ std::stringstream &ss)
std::string service_node::get_runtime_info(const std::string &indent,
const std::vector<std::string> &args)
{
ss << indent << full_name() << ":" << std::endl;

std::string indent2 = indent + "\t";
_computation->get_runtime_info(indent2, args, ss);
return fmt::format(
"{}{}:\n{}", indent, full_name(), _computation->get_runtime_info(indent + "\t", args));
}

void service_node::get_queue_info(
Expand Down Expand Up @@ -243,26 +241,32 @@ void service_engine::start_node(service_app_spec &app_spec)

std::string service_engine::get_runtime_info(const std::vector<std::string> &args)
{
std::stringstream ss;
if (args.size() == 0) {
ss << "" << service_engine::instance()._nodes_by_app_id.size()
<< " nodes available:" << std::endl;
for (auto &kv : service_engine::instance()._nodes_by_app_id) {
ss << "\t" << kv.second->id() << "." << kv.second->full_name() << std::endl;
}
} else {
std::string indent = "";
int id = atoi(args[0].c_str());
auto it = service_engine::instance()._nodes_by_app_id.find(id);
if (it != service_engine::instance()._nodes_by_app_id.end()) {
auto args2 = args;
args2.erase(args2.begin());
it->second->get_runtime_info(indent, args2, ss);
} else {
ss << "cannot find node with given app id";
// Overview.
if (args.empty()) {
auto result = fmt::format("{} nodes available:\n",
service_engine::instance()._nodes_by_app_id.size());
for (const auto &nodes_by_app_id : service_engine::instance()._nodes_by_app_id) {
result = fmt::format("{}\t{}.{}\n",
result,
nodes_by_app_id.second->id(),
nodes_by_app_id.second->full_name());
}
return result;
}
return ss.str();

int id;
if (!dsn::buf2int32(args[0], id)) {
return {"ERR: invalid argument, only one integer argument is acceptable"};
}

const auto &it = service_engine::instance()._nodes_by_app_id.find(id);
if (it == service_engine::instance()._nodes_by_app_id.end()) {
return fmt::format("ERR: cannot find node with given app id({})", id);
}

auto tmp_args = args;
tmp_args.erase(tmp_args.begin());
return it->second->get_runtime_info("", tmp_args);
}

std::string service_engine::get_queue_info(const std::vector<std::string> &args)
Expand Down
4 changes: 1 addition & 3 deletions src/runtime/service_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ class service_node
rpc_engine *rpc() const { return _rpc.get(); }
task_engine *computation() const { return _computation.get(); }

void get_runtime_info(const std::string &indent,
const std::vector<std::string> &args,
/*out*/ std::stringstream &ss);
std::string get_runtime_info(const std::string &indent, const std::vector<std::string> &args);
void get_queue_info(/*out*/ std::stringstream &ss);

dsn::error_code start();
Expand Down
64 changes: 41 additions & 23 deletions src/runtime/task/task_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,28 +184,40 @@ bool task_worker_pool::shared_same_worker_with_current_task(task *tsk) const
}
}

void task_worker_pool::get_runtime_info(const std::string &indent,
const std::vector<std::string> &args,
/*out*/ std::stringstream &ss)
std::string task_worker_pool::get_runtime_info(const std::string &indent,
const std::vector<std::string> &args)
{
std::string indent2 = indent + "\t";
ss << indent << "contains " << _workers.size() << " threads with " << _queues.size()
<< " queues" << std::endl;

for (auto &q : _queues) {
if (q) {
ss << indent2 << q->get_name() << " now has " << q->count() << " pending tasks"
<< std::endl;
const auto indent2 = fmt::format("{}\t", indent);

auto out = fmt::memory_buffer();
fmt::format_to(std::back_inserter(out),
"{}contains {} threads with {} queues",
indent,
_workers.size(),
_queues.size());
for (const auto &queue : _queues) {
if (queue) {
fmt::format_to(std::back_inserter(out),
"{}{} now has {} pending tasks\n",
indent2,
queue->get_name(),
queue->count());
}
}

for (auto &wk : _workers) {
if (wk) {
ss << indent2 << wk->index() << " (TID = " << wk->native_tid()
<< ") attached with queue " << wk->queue()->get_name() << std::endl;
for (const auto &worker : _workers) {
if (worker) {
fmt::format_to(std::back_inserter(out),
"{}{} (TID = {}) attached with queue {}\n",
indent2,
worker->index(),
worker->native_tid(),
worker->queue()->get_name());
}
}
return fmt::to_string(out);
}

void task_worker_pool::get_queue_info(/*out*/ std::stringstream &ss)
{
ss << "[";
Expand Down Expand Up @@ -279,17 +291,23 @@ volatile int *task_engine::get_task_queue_virtual_length_ptr(dsn::task_code code
return pl->queues()[idx]->get_virtual_length_ptr();
}

void task_engine::get_runtime_info(const std::string &indent,
const std::vector<std::string> &args,
/*out*/ std::stringstream &ss)
std::string task_engine::get_runtime_info(const std::string &indent,
const std::vector<std::string> &args)
{
std::string indent2 = indent + "\t";
for (auto &p : _pools) {
if (p) {
ss << indent << p->spec().pool_code.to_string() << std::endl;
p->get_runtime_info(indent2, args, ss);
const auto indent2 = fmt::format("{}\t", indent);

auto out = fmt::memory_buffer();
fmt::format_to(std::back_inserter(out), indent2);
for (const auto &pool : _pools) {
if (pool) {
fmt::format_to(std::back_inserter(out),
"{}{}\n{}",
indent,
pool->spec().pool_code,
pool->get_runtime_info(indent2, args));
}
}
return fmt::to_string(out);
}

void task_engine::get_queue_info(/*out*/ std::stringstream &ss)
Expand Down
8 changes: 2 additions & 6 deletions src/runtime/task/task_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ class task_worker_pool
bool shared_same_worker_with_current_task(task *task) const;
task_engine *engine() const { return _owner; }
service_node *node() const { return _node; }
void get_runtime_info(const std::string &indent,
const std::vector<std::string> &args,
/*out*/ std::stringstream &ss);
std::string get_runtime_info(const std::string &indent, const std::vector<std::string> &args);
void get_queue_info(/*out*/ std::stringstream &ss);
std::vector<task_queue *> &queues() { return _queues; }
std::vector<task_worker *> &workers() { return _workers; }
Expand Down Expand Up @@ -118,9 +116,7 @@ class task_engine
volatile int *get_task_queue_virtual_length_ptr(dsn::task_code code, int hash);

service_node *node() const { return _node; }
void get_runtime_info(const std::string &indent,
const std::vector<std::string> &args,
/*out*/ std::stringstream &ss);
std::string get_runtime_info(const std::string &indent, const std::vector<std::string> &args);
void get_queue_info(/*out*/ std::stringstream &ss);

private:
Expand Down
4 changes: 1 addition & 3 deletions src/runtime/test/task_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ TEST(core, task_engine)

ASSERT_TRUE(engine->is_started());
std::vector<std::string> args;
std::stringstream oss;
engine->get_runtime_info(" ", args, oss);
printf("%s\n", oss.str().c_str());
fmt::print(stdout, "{}\n", engine->get_runtime_info(" ", args));

std::vector<task_worker_pool *> &pools = engine->pools();
for (size_t i = 0; i < pools.size(); ++i) {
Expand Down
2 changes: 1 addition & 1 deletion src/shell/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ static command_executor commands[] = {
"remote_command",
"send remote command to servers",
"[-t all|meta-server|replica-server] [-r|--resolve_ip] [-l ip:port,ip:port...]"
"<command> [arguments...]",
" <command> [arguments...]",
remote_command,
},
{
Expand Down

0 comments on commit 3cb5420

Please sign in to comment.