Skip to content

Commit

Permalink
Merge pull request #11183 from liewegas/wip-msgr-features
Browse files Browse the repository at this point in the history
msg: make loopback Connection feature accurate all the time

Reviewed-by: Kefu Chai <kchai@redhat.com>
  • Loading branch information
tchaikov committed Oct 11, 2016
2 parents b1853bf + 0dbe8fd commit bee4218
Show file tree
Hide file tree
Showing 20 changed files with 37 additions and 47 deletions.
2 changes: 1 addition & 1 deletion src/ceph_mds.cc
Expand Up @@ -141,7 +141,7 @@ int main(int argc, const char **argv)

Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MDS(-1), "mds",
nonce, 0, Messenger::HAS_MANY_CONNECTIONS);
nonce, Messenger::HAS_MANY_CONNECTIONS);
if (!msgr)
exit(1);
msgr->set_cluster_protocol(CEPH_MDS_PROTOCOL);
Expand Down
2 changes: 1 addition & 1 deletion src/ceph_mon.cc
Expand Up @@ -642,7 +642,7 @@ int main(int argc, const char **argv)
int rank = monmap.get_rank(g_conf->name.get_id());
Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MON(rank), "mon",
0, 0, Messenger::HAS_MANY_CONNECTIONS);
0, Messenger::HAS_MANY_CONNECTIONS);
if (!msgr)
exit(1);
msgr->set_cluster_protocol(CEPH_MON_PROTOCOL);
Expand Down
12 changes: 6 additions & 6 deletions src/ceph_osd.cc
Expand Up @@ -430,26 +430,26 @@ int main(int argc, const char **argv)

Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "client",
getpid(), 0,
getpid(),
Messenger::HAS_HEAVY_TRAFFIC |
Messenger::HAS_MANY_CONNECTIONS);
Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "cluster",
getpid(), CEPH_FEATURES_ALL,
getpid(),
Messenger::HAS_HEAVY_TRAFFIC |
Messenger::HAS_MANY_CONNECTIONS);
Messenger *ms_hbclient = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hbclient",
getpid(), 0, Messenger::HEARTBEAT);
getpid(), Messenger::HEARTBEAT);
Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_back_server",
getpid(), 0, Messenger::HEARTBEAT);
getpid(), Messenger::HEARTBEAT);
Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_front_server",
getpid(), 0, Messenger::HEARTBEAT);
getpid(), Messenger::HEARTBEAT);
Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "ms_objecter",
getpid());
getpid(), 0);
if (!ms_public || !ms_cluster || !ms_hbclient || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter)
exit(1);
ms_cluster->set_cluster_protocol(CEPH_OSD_PROTOCOL);
Expand Down
2 changes: 1 addition & 1 deletion src/mgr/DaemonServer.cc
Expand Up @@ -46,7 +46,7 @@ int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
{
// Initialize Messenger
msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MGR(gid), "server", getpid());
entity_name_t::MGR(gid), "server", getpid(), 0);
int r = msgr->bind(g_conf->public_addr);
if (r < 0)
return r;
Expand Down
8 changes: 4 additions & 4 deletions src/msg/Messenger.cc
Expand Up @@ -25,7 +25,7 @@ static Spinlock random_lock;

Messenger *Messenger::create(CephContext *cct, const string &type,
entity_name_t name, string lname,
uint64_t nonce, uint64_t features, uint64_t cflags)
uint64_t nonce, uint64_t cflags)
{
int r = -1;
if (type == "random") {
Expand All @@ -34,13 +34,13 @@ Messenger *Messenger::create(CephContext *cct, const string &type,
r = dis(random_engine);
}
if (r == 0 || type == "simple")
return new SimpleMessenger(cct, name, lname, nonce, features);
return new SimpleMessenger(cct, name, lname, nonce);
else if (r == 1 || type == "async")
return new AsyncMessenger(cct, name, lname, nonce, features);
return new AsyncMessenger(cct, name, lname, nonce);
#ifdef HAVE_XIO
else if ((type == "xio") &&
cct->check_experimental_feature_enabled("ms-type-xio"))
return new XioMessenger(cct, name, lname, nonce, features, cflags);
return new XioMessenger(cct, name, lname, nonce, cflags);
#endif
lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
return nullptr;
Expand Down
3 changes: 1 addition & 2 deletions src/msg/Messenger.h
Expand Up @@ -166,8 +166,7 @@ class Messenger {
entity_name_t name,
string lname,
uint64_t nonce,
uint64_t features = 0,
uint64_t cflags = 0);
uint64_t cflags);

/**
* create a new messenger
Expand Down
3 changes: 1 addition & 2 deletions src/msg/async/AsyncMessenger.cc
Expand Up @@ -259,7 +259,7 @@ class C_handle_reap : public EventCallback {
*/

AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce, uint64_t features)
string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
dispatch_queue(cct, this, mname),
lock("AsyncMessenger::lock"),
Expand All @@ -274,7 +274,6 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
stack->start();
local_worker = stack->get_worker();
local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
local_features = features;
init_local_connection();
reap_handler = new C_handle_reap(this);
unsigned processor_num = 1;
Expand Down
5 changes: 2 additions & 3 deletions src/msg/async/AsyncMessenger.h
Expand Up @@ -84,7 +84,7 @@ class AsyncMessenger : public SimplePolicyMessenger {
* be a value that will be repeated if the daemon restarts.
*/
AsyncMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce, uint64_t features);
string mname, uint64_t _nonce);

/**
* Destroy the AsyncMessenger. Pretty simple since all the work is done
Expand Down Expand Up @@ -306,7 +306,7 @@ class AsyncMessenger : public SimplePolicyMessenger {
assert(lock.is_locked());
local_connection->peer_addr = my_inst.addr;
local_connection->peer_type = my_inst.name.type();
local_connection->set_features(local_features);
local_connection->set_features(CEPH_FEATURES_ALL);
ms_deliver_handle_fast_connect(local_connection.get());
}

Expand All @@ -316,7 +316,6 @@ class AsyncMessenger : public SimplePolicyMessenger {

/// con used for sending messages to ourselves
ConnectionRef local_connection;
uint64_t local_features;

/**
* @defgroup AsyncMessenger internals
Expand Down
5 changes: 2 additions & 3 deletions src/msg/simple/SimpleMessenger.cc
Expand Up @@ -39,7 +39,7 @@ static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) {
*/

SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce, uint64_t features)
string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
accepter(this, _nonce),
dispatch_queue(cct, this, mname),
Expand All @@ -55,7 +55,6 @@ SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
ANNOTATE_BENIGN_RACE_SIZED(&timeout, sizeof(timeout),
"SimpleMessenger read timeout");
ceph_spin_init(&global_seq_lock);
local_features = features;
init_local_connection();
}

Expand Down Expand Up @@ -718,6 +717,6 @@ void SimpleMessenger::init_local_connection()
{
local_connection->peer_addr = my_inst.addr;
local_connection->peer_type = my_inst.name.type();
local_connection->set_features(local_features);
local_connection->set_features(CEPH_FEATURES_ALL);
ms_deliver_handle_fast_connect(local_connection.get());
}
3 changes: 1 addition & 2 deletions src/msg/simple/SimpleMessenger.h
Expand Up @@ -82,7 +82,7 @@ class SimpleMessenger : public SimplePolicyMessenger {
* features The local features bits for the local_connection
*/
SimpleMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce, uint64_t features);
string mname, uint64_t _nonce);

/**
* Destroy the SimpleMessenger. Pretty simple since all the work is done
Expand Down Expand Up @@ -332,7 +332,6 @@ class SimpleMessenger : public SimplePolicyMessenger {

/// con used for sending messages to ourselves
ConnectionRef local_connection;
uint64_t local_features;

/**
* @defgroup SimpleMessenger internals
Expand Down
7 changes: 2 additions & 5 deletions src/msg/xio/XioMessenger.cc
Expand Up @@ -350,7 +350,7 @@ static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) {
}

XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce, uint64_t features,
string mname, uint64_t _nonce,
uint64_t cflags, DispatchStrategy *ds)
: SimplePolicyMessenger(cct, name, mname, _nonce),
XioInit(cct),
Expand Down Expand Up @@ -378,7 +378,6 @@ XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
/* update class instance count */
nInstances.inc();

local_features = features;
loop_con->set_features(features);

ldout(cct,2) << "Create msgr: " << this << " instance: "
Expand Down Expand Up @@ -775,9 +774,7 @@ static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
return NULL;
XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr);
assert(!!xmsg);
new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt,
static_cast<XioMessenger*>(
xcon->get_messenger())->local_features);
new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL);
return xmsg;
}

Expand Down
5 changes: 1 addition & 4 deletions src/msg/xio/XioMessenger.h
Expand Up @@ -63,7 +63,7 @@ class XioMessenger : public SimplePolicyMessenger, XioInit

public:
XioMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t nonce, uint64_t features,
string mname, uint64_t nonce,
uint64_t cflags = 0,
DispatchStrategy* ds = new QueueStrategy(1));

Expand Down Expand Up @@ -156,9 +156,6 @@ class XioMessenger : public SimplePolicyMessenger, XioInit
protected:
virtual void ready()
{ }

public:
uint64_t local_features;
};

XioCommand* pool_alloc_xio_command(XioConnection *xcon);
Expand Down
2 changes: 1 addition & 1 deletion src/test/messenger/simple_client.cc
Expand Up @@ -105,7 +105,7 @@ int main(int argc, const char **argv)
messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MON(-1),
"client",
getpid());
getpid(), 0);

// enable timing prints
messenger->set_magic(MSG_MAGIC_TRACE_CTR);
Expand Down
3 changes: 2 additions & 1 deletion src/test/messenger/simple_server.cc
Expand Up @@ -76,7 +76,8 @@ int main(int argc, const char **argv)
messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MON(-1),
"simple_server",
0 /* nonce */);
0 /* nonce */,
0 /* flags */);
// enable timing prints
messenger->set_magic(MSG_MAGIC_TRACE_CTR);
messenger->set_default_policy(
Expand Down
2 changes: 1 addition & 1 deletion src/test/mon/test-mon-msg.cc
Expand Up @@ -79,7 +79,7 @@ class MonClientHelper : public Dispatcher
dout(1) << __func__ << dendl;

msg = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1),
"test-mon-msg", 0);
"test-mon-msg", 0, 0);
assert(msg != NULL);
msg->set_default_policy(Messenger::Policy::lossy_client(0,0));
dout(0) << __func__ << " starting messenger at "
Expand Down
2 changes: 1 addition & 1 deletion src/test/mon/test_mon_workloadgen.cc
Expand Up @@ -361,7 +361,7 @@ class OSDStub : public TestStub
stringstream ss;
ss << "client-osd" << whoami;
messenger.reset(Messenger::create(cct, cct->_conf->ms_type, entity_name_t::OSD(whoami),
ss.str().c_str(), getpid()));
ss.str().c_str(), getpid(), 0));

Throttle throttler(g_ceph_context, "osd_client_bytes",
g_conf->osd_client_message_size_cap);
Expand Down
2 changes: 1 addition & 1 deletion src/test/msgr/perf_msgr_client.cc
Expand Up @@ -130,7 +130,7 @@ class MessengerClient {
addr.parse(serveraddr.c_str());
addr.set_nonce(0);
for (int i = 0; i < jobs; ++i) {
Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i);
Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i, 0);
msgr->set_default_policy(Messenger::Policy::lossless_client(0, 0));
entity_inst_t inst(entity_name_t::OSD(0), addr);
ConnectionRef conn = msgr->get_connection(inst);
Expand Down
2 changes: 1 addition & 1 deletion src/test/msgr/perf_msgr_server.cc
Expand Up @@ -116,7 +116,7 @@ class MessengerServer {
public:
MessengerServer(string t, string addr, int threads, int delay):
msgr(NULL), type(t), bindaddr(addr), dispatcher(threads, delay) {
msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0);
msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0, 0);
msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
}
~MessengerServer() {
Expand Down
10 changes: 5 additions & 5 deletions src/test/msgr/test_msgr.cc
Expand Up @@ -65,8 +65,8 @@ class MessengerTest : public ::testing::TestWithParam<const char*> {
MessengerTest(): server_msgr(NULL), client_msgr(NULL) {}
virtual void SetUp() {
lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid());
server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
server_msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
client_msgr->set_default_policy(Messenger::Policy::lossy_client(0, 0));
}
Expand Down Expand Up @@ -952,7 +952,7 @@ class SyntheticWorkload {
char addr[64];
for (int i = 0; i < servers; ++i) {
msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
"server", getpid()+i);
"server", getpid()+i, 0);
snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
bind_addr.parse(addr);
msgr->bind(bind_addr);
Expand All @@ -966,7 +966,7 @@ class SyntheticWorkload {

for (int i = 0; i < clients; ++i) {
msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
"client", getpid()+i+servers);
"client", getpid()+i+servers, 0);
if (cli_policy.standby) {
snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers);
bind_addr.parse(addr);
Expand Down Expand Up @@ -1431,7 +1431,7 @@ class MarkdownDispatcher : public Dispatcher {

// Markdown with external lock
TEST_P(MessengerTest, MarkdownTest) {
Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
entity_addr_t bind_addr;
bind_addr.parse("127.0.0.1:16800");
Expand Down
4 changes: 2 additions & 2 deletions src/test/osd/TestOSDScrub.cc
Expand Up @@ -57,8 +57,8 @@ TEST(TestOSDScrub, scrub_time_permit) {
g_conf->osd_data,
g_conf->osd_journal);
Messenger *ms = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(0), "make_checker",
getpid());
entity_name_t::OSD(0), "make_checker",
getpid(), 0);
ms->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms->set_default_policy(Messenger::Policy::stateless_server(0, 0));
ms->bind(g_conf->public_addr);
Expand Down

0 comments on commit bee4218

Please sign in to comment.