Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

msg: make loopback Connection feature accurate all the time #11183

Merged
merged 1 commit into from Oct 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ceph_mds.cc
Expand Up @@ -141,7 +141,7 @@ int main(int argc, const char **argv)

Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MDS(-1), "mds",
nonce, 0, Messenger::HAS_MANY_CONNECTIONS);
nonce, Messenger::HAS_MANY_CONNECTIONS);
if (!msgr)
exit(1);
msgr->set_cluster_protocol(CEPH_MDS_PROTOCOL);
Expand Down
2 changes: 1 addition & 1 deletion src/ceph_mon.cc
Expand Up @@ -642,7 +642,7 @@ int main(int argc, const char **argv)
int rank = monmap.get_rank(g_conf->name.get_id());
Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MON(rank), "mon",
0, 0, Messenger::HAS_MANY_CONNECTIONS);
0, Messenger::HAS_MANY_CONNECTIONS);
if (!msgr)
exit(1);
msgr->set_cluster_protocol(CEPH_MON_PROTOCOL);
Expand Down
12 changes: 6 additions & 6 deletions src/ceph_osd.cc
Expand Up @@ -430,26 +430,26 @@ int main(int argc, const char **argv)

Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "client",
getpid(), 0,
getpid(),
Messenger::HAS_HEAVY_TRAFFIC |
Messenger::HAS_MANY_CONNECTIONS);
Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "cluster",
getpid(), CEPH_FEATURES_ALL,
getpid(),
Messenger::HAS_HEAVY_TRAFFIC |
Messenger::HAS_MANY_CONNECTIONS);
Messenger *ms_hbclient = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hbclient",
getpid(), 0, Messenger::HEARTBEAT);
getpid(), Messenger::HEARTBEAT);
Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_back_server",
getpid(), 0, Messenger::HEARTBEAT);
getpid(), Messenger::HEARTBEAT);
Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_front_server",
getpid(), 0, Messenger::HEARTBEAT);
getpid(), Messenger::HEARTBEAT);
Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "ms_objecter",
getpid());
getpid(), 0);
if (!ms_public || !ms_cluster || !ms_hbclient || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter)
exit(1);
ms_cluster->set_cluster_protocol(CEPH_OSD_PROTOCOL);
Expand Down
2 changes: 1 addition & 1 deletion src/mgr/DaemonServer.cc
Expand Up @@ -46,7 +46,7 @@ int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
{
// Initialize Messenger
msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MGR(gid), "server", getpid());
entity_name_t::MGR(gid), "server", getpid(), 0);
int r = msgr->bind(g_conf->public_addr);
if (r < 0)
return r;
Expand Down
8 changes: 4 additions & 4 deletions src/msg/Messenger.cc
Expand Up @@ -25,7 +25,7 @@ static Spinlock random_lock;

Messenger *Messenger::create(CephContext *cct, const string &type,
entity_name_t name, string lname,
uint64_t nonce, uint64_t features, uint64_t cflags)
uint64_t nonce, uint64_t cflags)
{
int r = -1;
if (type == "random") {
Expand All @@ -34,13 +34,13 @@ Messenger *Messenger::create(CephContext *cct, const string &type,
r = dis(random_engine);
}
if (r == 0 || type == "simple")
return new SimpleMessenger(cct, name, lname, nonce, features);
return new SimpleMessenger(cct, name, lname, nonce);
else if (r == 1 || type == "async")
return new AsyncMessenger(cct, name, lname, nonce, features);
return new AsyncMessenger(cct, name, lname, nonce);
#ifdef HAVE_XIO
else if ((type == "xio") &&
cct->check_experimental_feature_enabled("ms-type-xio"))
return new XioMessenger(cct, name, lname, nonce, features, cflags);
return new XioMessenger(cct, name, lname, nonce, cflags);
#endif
lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
return nullptr;
Expand Down
3 changes: 1 addition & 2 deletions src/msg/Messenger.h
Expand Up @@ -166,8 +166,7 @@ class Messenger {
entity_name_t name,
string lname,
uint64_t nonce,
uint64_t features = 0,
uint64_t cflags = 0);
uint64_t cflags);

/**
* create a new messenger
Expand Down
3 changes: 1 addition & 2 deletions src/msg/async/AsyncMessenger.cc
Expand Up @@ -259,7 +259,7 @@ class C_handle_reap : public EventCallback {
*/

AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce, uint64_t features)
string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
dispatch_queue(cct, this, mname),
lock("AsyncMessenger::lock"),
Expand All @@ -274,7 +274,6 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
stack->start();
local_worker = stack->get_worker();
local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
local_features = features;
init_local_connection();
reap_handler = new C_handle_reap(this);
unsigned processor_num = 1;
Expand Down
5 changes: 2 additions & 3 deletions src/msg/async/AsyncMessenger.h
Expand Up @@ -84,7 +84,7 @@ class AsyncMessenger : public SimplePolicyMessenger {
* be a value that will be repeated if the daemon restarts.
*/
AsyncMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce, uint64_t features);
string mname, uint64_t _nonce);

/**
* Destroy the AsyncMessenger. Pretty simple since all the work is done
Expand Down Expand Up @@ -306,7 +306,7 @@ class AsyncMessenger : public SimplePolicyMessenger {
assert(lock.is_locked());
local_connection->peer_addr = my_inst.addr;
local_connection->peer_type = my_inst.name.type();
local_connection->set_features(local_features);
local_connection->set_features(CEPH_FEATURES_ALL);
ms_deliver_handle_fast_connect(local_connection.get());
}

Expand All @@ -316,7 +316,6 @@ class AsyncMessenger : public SimplePolicyMessenger {

/// con used for sending messages to ourselves
ConnectionRef local_connection;
uint64_t local_features;

/**
* @defgroup AsyncMessenger internals
Expand Down
5 changes: 2 additions & 3 deletions src/msg/simple/SimpleMessenger.cc
Expand Up @@ -39,7 +39,7 @@ static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) {
*/

SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce, uint64_t features)
string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
accepter(this, _nonce),
dispatch_queue(cct, this, mname),
Expand All @@ -55,7 +55,6 @@ SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
ANNOTATE_BENIGN_RACE_SIZED(&timeout, sizeof(timeout),
"SimpleMessenger read timeout");
ceph_spin_init(&global_seq_lock);
local_features = features;
init_local_connection();
}

Expand Down Expand Up @@ -718,6 +717,6 @@ void SimpleMessenger::init_local_connection()
{
local_connection->peer_addr = my_inst.addr;
local_connection->peer_type = my_inst.name.type();
local_connection->set_features(local_features);
local_connection->set_features(CEPH_FEATURES_ALL);
ms_deliver_handle_fast_connect(local_connection.get());
}
3 changes: 1 addition & 2 deletions src/msg/simple/SimpleMessenger.h
Expand Up @@ -82,7 +82,7 @@ class SimpleMessenger : public SimplePolicyMessenger {
* features The local features bits for the local_connection
*/
SimpleMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce, uint64_t features);
string mname, uint64_t _nonce);

/**
* Destroy the SimpleMessenger. Pretty simple since all the work is done
Expand Down Expand Up @@ -332,7 +332,6 @@ class SimpleMessenger : public SimplePolicyMessenger {

/// con used for sending messages to ourselves
ConnectionRef local_connection;
uint64_t local_features;

/**
* @defgroup SimpleMessenger internals
Expand Down
7 changes: 2 additions & 5 deletions src/msg/xio/XioMessenger.cc
Expand Up @@ -350,7 +350,7 @@ static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) {
}

XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce, uint64_t features,
string mname, uint64_t _nonce,
uint64_t cflags, DispatchStrategy *ds)
: SimplePolicyMessenger(cct, name, mname, _nonce),
XioInit(cct),
Expand Down Expand Up @@ -378,7 +378,6 @@ XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
/* update class instance count */
nInstances.inc();

local_features = features;
loop_con->set_features(features);

ldout(cct,2) << "Create msgr: " << this << " instance: "
Expand Down Expand Up @@ -775,9 +774,7 @@ static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
return NULL;
XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr);
assert(!!xmsg);
new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt,
static_cast<XioMessenger*>(
xcon->get_messenger())->local_features);
new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL);
return xmsg;
}

Expand Down
5 changes: 1 addition & 4 deletions src/msg/xio/XioMessenger.h
Expand Up @@ -63,7 +63,7 @@ class XioMessenger : public SimplePolicyMessenger, XioInit

public:
XioMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t nonce, uint64_t features,
string mname, uint64_t nonce,
uint64_t cflags = 0,
DispatchStrategy* ds = new QueueStrategy(1));

Expand Down Expand Up @@ -156,9 +156,6 @@ class XioMessenger : public SimplePolicyMessenger, XioInit
protected:
virtual void ready()
{ }

public:
uint64_t local_features;
};

XioCommand* pool_alloc_xio_command(XioConnection *xcon);
Expand Down
2 changes: 1 addition & 1 deletion src/test/messenger/simple_client.cc
Expand Up @@ -105,7 +105,7 @@ int main(int argc, const char **argv)
messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MON(-1),
"client",
getpid());
getpid(), 0);

// enable timing prints
messenger->set_magic(MSG_MAGIC_TRACE_CTR);
Expand Down
3 changes: 2 additions & 1 deletion src/test/messenger/simple_server.cc
Expand Up @@ -76,7 +76,8 @@ int main(int argc, const char **argv)
messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MON(-1),
"simple_server",
0 /* nonce */);
0 /* nonce */,
0 /* flags */);
// enable timing prints
messenger->set_magic(MSG_MAGIC_TRACE_CTR);
messenger->set_default_policy(
Expand Down
2 changes: 1 addition & 1 deletion src/test/mon/test-mon-msg.cc
Expand Up @@ -79,7 +79,7 @@ class MonClientHelper : public Dispatcher
dout(1) << __func__ << dendl;

msg = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1),
"test-mon-msg", 0);
"test-mon-msg", 0, 0);
assert(msg != NULL);
msg->set_default_policy(Messenger::Policy::lossy_client(0,0));
dout(0) << __func__ << " starting messenger at "
Expand Down
2 changes: 1 addition & 1 deletion src/test/mon/test_mon_workloadgen.cc
Expand Up @@ -361,7 +361,7 @@ class OSDStub : public TestStub
stringstream ss;
ss << "client-osd" << whoami;
messenger.reset(Messenger::create(cct, cct->_conf->ms_type, entity_name_t::OSD(whoami),
ss.str().c_str(), getpid()));
ss.str().c_str(), getpid(), 0));

Throttle throttler(g_ceph_context, "osd_client_bytes",
g_conf->osd_client_message_size_cap);
Expand Down
2 changes: 1 addition & 1 deletion src/test/msgr/perf_msgr_client.cc
Expand Up @@ -130,7 +130,7 @@ class MessengerClient {
addr.parse(serveraddr.c_str());
addr.set_nonce(0);
for (int i = 0; i < jobs; ++i) {
Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i);
Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i, 0);
msgr->set_default_policy(Messenger::Policy::lossless_client(0, 0));
entity_inst_t inst(entity_name_t::OSD(0), addr);
ConnectionRef conn = msgr->get_connection(inst);
Expand Down
2 changes: 1 addition & 1 deletion src/test/msgr/perf_msgr_server.cc
Expand Up @@ -116,7 +116,7 @@ class MessengerServer {
public:
MessengerServer(string t, string addr, int threads, int delay):
msgr(NULL), type(t), bindaddr(addr), dispatcher(threads, delay) {
msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0);
msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0, 0);
msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
}
~MessengerServer() {
Expand Down
10 changes: 5 additions & 5 deletions src/test/msgr/test_msgr.cc
Expand Up @@ -65,8 +65,8 @@ class MessengerTest : public ::testing::TestWithParam<const char*> {
MessengerTest(): server_msgr(NULL), client_msgr(NULL) {}
virtual void SetUp() {
lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid());
server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
server_msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
client_msgr->set_default_policy(Messenger::Policy::lossy_client(0, 0));
}
Expand Down Expand Up @@ -952,7 +952,7 @@ class SyntheticWorkload {
char addr[64];
for (int i = 0; i < servers; ++i) {
msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
"server", getpid()+i);
"server", getpid()+i, 0);
snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
bind_addr.parse(addr);
msgr->bind(bind_addr);
Expand All @@ -966,7 +966,7 @@ class SyntheticWorkload {

for (int i = 0; i < clients; ++i) {
msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
"client", getpid()+i+servers);
"client", getpid()+i+servers, 0);
if (cli_policy.standby) {
snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers);
bind_addr.parse(addr);
Expand Down Expand Up @@ -1431,7 +1431,7 @@ class MarkdownDispatcher : public Dispatcher {

// Markdown with external lock
TEST_P(MessengerTest, MarkdownTest) {
Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
entity_addr_t bind_addr;
bind_addr.parse("127.0.0.1:16800");
Expand Down
4 changes: 2 additions & 2 deletions src/test/osd/TestOSDScrub.cc
Expand Up @@ -57,8 +57,8 @@ TEST(TestOSDScrub, scrub_time_permit) {
g_conf->osd_data,
g_conf->osd_journal);
Messenger *ms = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(0), "make_checker",
getpid());
entity_name_t::OSD(0), "make_checker",
getpid(), 0);
ms->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms->set_default_policy(Messenger::Policy::stateless_server(0, 0));
ms->bind(g_conf->public_addr);
Expand Down