Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools/crimson/perf_crimson_msgr:fix perf_crimson_msgr abort #47039

Merged
merged 2 commits into from Jul 14, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 27 additions & 21 deletions src/tools/crimson/perf_crimson_msgr.cc
Expand Up @@ -18,6 +18,7 @@

#include "crimson/auth/DummyAuth.h"
#include "crimson/common/log.h"
#include "crimson/common/config_proxy.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Dispatcher.h"
#include "crimson/net/Messenger.h"
Expand Down Expand Up @@ -66,7 +67,7 @@ struct client_config {
unsigned msgtime;
unsigned jobs;
unsigned depth;
bool v1_crc_enabled;
bool crc_enabled;

std::string str() const {
std::ostringstream out;
Expand All @@ -76,7 +77,7 @@ struct client_config {
<< ", msgtime=" << msgtime
<< ", jobs=" << jobs
<< ", depth=" << depth
<< ", v1-crc-enabled=" << v1_crc_enabled
<< ", crc-enabled=" << crc_enabled
<< ")";
return out.str();
}
Expand All @@ -93,7 +94,7 @@ struct client_config {
conf.jobs = options["jobs"].as<unsigned>();
conf.depth = options["depth"].as<unsigned>();
ceph_assert(conf.depth % conf.jobs == 0);
conf.v1_crc_enabled = options["v1-crc-enabled"].as<bool>();
conf.crc_enabled = options["crc-enabled"].as<bool>();
return conf;
}
};
Expand All @@ -102,14 +103,14 @@ struct server_config {
entity_addr_t addr;
unsigned block_size;
unsigned core;
bool v1_crc_enabled;
bool crc_enabled;

std::string str() const {
std::ostringstream out;
out << "server[" << addr
<< "](bs=" << block_size
<< ", core=" << core
<< ", v1-crc-enabled=" << v1_crc_enabled
<< ", crc-enabled=" << crc_enabled
<< ")";
return out.str();
}
Expand All @@ -122,7 +123,7 @@ struct server_config {
conf.addr = addr;
conf.block_size = options["sbs"].as<unsigned>();
conf.core = options["core"].as<unsigned>();
conf.v1_crc_enabled = options["v1-crc-enabled"].as<bool>();
conf.crc_enabled = options["crc-enabled"].as<bool>();
return conf;
}
};
Expand Down Expand Up @@ -173,14 +174,14 @@ static seastar::future<> run(
return {seastar::now()};
}

seastar::future<> init(bool v1_crc_enabled, const entity_addr_t& addr) {
return seastar::smp::submit_to(msgr_sid, [v1_crc_enabled, addr, this] {
seastar::future<> init(bool crc_enabled, const entity_addr_t& addr) {
return seastar::smp::submit_to(msgr_sid, [crc_enabled, addr, this] {
// server msgr is always with nonce 0
msgr = crimson::net::Messenger::create(entity_name_t::OSD(msgr_sid), lname, 0);
msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
if (v1_crc_enabled) {
if (crc_enabled) {
msgr->set_crc_header();
msgr->set_crc_data();
}
Expand Down Expand Up @@ -339,15 +340,15 @@ static seastar::future<> run(
return sid != 0 && sid <= jobs;
}

seastar::future<> init(bool v1_crc_enabled) {
return container().invoke_on_all([v1_crc_enabled] (auto& client) {
seastar::future<> init(bool crc_enabled) {
return container().invoke_on_all([crc_enabled] (auto& client) {
if (client.is_active()) {
client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid);
client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
client.msgr->set_require_authorizer(false);
client.msgr->set_auth_client(&client.dummy_auth);
client.msgr->set_auth_server(&client.dummy_auth);
if (v1_crc_enabled) {
if (crc_enabled) {
client.msgr->set_crc_header();
client.msgr->set_crc_data();
}
Expand Down Expand Up @@ -653,7 +654,10 @@ static seastar::future<> run(

return seastar::when_all(
test_state::Server::create(server_conf.core, server_conf.block_size),
create_sharded<test_state::Client>(client_conf.jobs, client_conf.block_size, client_conf.depth)
create_sharded<test_state::Client>(client_conf.jobs, client_conf.block_size, client_conf.depth),
crimson::common::sharded_conf().start(EntityName{}, std::string_view{"ceph"}).then([] {
return crimson::common::local_conf().start();
})
).then([=](auto&& ret) {
auto fp_server = std::move(std::get<0>(ret).get0());
auto client = std::move(std::get<1>(ret).get0());
Expand All @@ -666,8 +670,8 @@ static seastar::future<> run(
ceph_assert(seastar::smp::count >= 1+server_conf.core);
ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs);
return seastar::when_all_succeed(
server->init(server_conf.v1_crc_enabled, server_conf.addr),
client->init(client_conf.v1_crc_enabled)
server->init(server_conf.crc_enabled, server_conf.addr),
client->init(client_conf.crc_enabled)
).then_unpack([client, addr = client_conf.server_addr] {
return client->connect_wait_verify(addr);
}).then([client, ramptime = client_conf.ramptime,
Expand All @@ -682,7 +686,7 @@ static seastar::future<> run(
logger().info("\nperf settings:\n {}\n", client_conf.str());
ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
ceph_assert(client_conf.jobs > 0);
return client->init(client_conf.v1_crc_enabled
return client->init(client_conf.crc_enabled
).then([client, addr = client_conf.server_addr] {
return client->connect_wait_verify(addr);
}).then([client, ramptime = client_conf.ramptime,
Expand All @@ -694,7 +698,7 @@ static seastar::future<> run(
} else { // mode == perf_mode_t::server
ceph_assert(seastar::smp::count >= 1+server_conf.core);
logger().info("\nperf settings:\n {}\n", server_conf.str());
return server->init(server_conf.v1_crc_enabled, server_conf.addr
return server->init(server_conf.crc_enabled, server_conf.addr
// dispatch ops
).then([server] {
return server->wait();
Expand All @@ -703,6 +707,8 @@ static seastar::future<> run(
return server->shutdown().then([cleanup = std::move(fp_server)] {});
});
}
}).finally([] {
return crimson::common::sharded_conf().stop();
});
}

Expand All @@ -714,8 +720,8 @@ int main(int argc, char** argv)
app.add_options()
("mode", bpo::value<unsigned>()->default_value(0),
"0: both, 1:client, 2:server")
("addr", bpo::value<std::string>()->default_value("v1:127.0.0.1:9010"),
"server address")
("addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9010"),
"server address(only support msgr v2 protocol)")
("ramptime", bpo::value<unsigned>()->default_value(5),
"seconds of client ramp-up time")
("msgtime", bpo::value<unsigned>()->default_value(15),
Expand All @@ -730,8 +736,8 @@ int main(int argc, char** argv)
"server running core")
("sbs", bpo::value<unsigned>()->default_value(0),
"server block size")
("v1-crc-enabled", bpo::value<bool>()->default_value(false),
"enable v1 CRC checks");
("crc-enabled", bpo::value<bool>()->default_value(false),
"enable CRC checks");
return app.run(argc, argv, [&app] {
auto&& config = app.configuration();
auto mode = config["mode"].as<unsigned>();
Expand Down