Skip to content

Commit

Permalink
msg, ceph_osd: Support feature bits for all message type's local conn…
Browse files Browse the repository at this point in the history
…ection

Signed-off-by: David Zafman <dzafman@redhat.com>
  • Loading branch information
dzafman committed Jun 20, 2015
1 parent de04124 commit 626360a
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/ceph_osd.cc
Expand Up @@ -413,7 +413,7 @@ int main(int argc, const char **argv)
getpid());
Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "cluster",
getpid());
getpid(), CEPH_FEATURES_ALL);
Messenger *ms_hbclient = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hbclient",
getpid());
Expand Down
8 changes: 4 additions & 4 deletions src/msg/Messenger.cc
Expand Up @@ -12,20 +12,20 @@

Messenger *Messenger::create(CephContext *cct, const string &type,
entity_name_t name, string lname,
uint64_t nonce)
uint64_t nonce, uint64_t features)
{
int r = -1;
if (type == "random")
r = rand() % 2; // random does not include xio
if (r == 0 || type == "simple")
return new SimpleMessenger(cct, name, lname, nonce);
return new SimpleMessenger(cct, name, lname, nonce, features);
else if ((r == 1 || type == "async") &&
cct->check_experimental_feature_enabled("ms-type-async"))
return new AsyncMessenger(cct, name, lname, nonce);
return new AsyncMessenger(cct, name, lname, nonce, features);
#ifdef HAVE_XIO
else if ((type == "xio") &&
cct->check_experimental_feature_enabled("ms-type-xio"))
return new XioMessenger(cct, name, lname, nonce);
return new XioMessenger(cct, name, lname, nonce, features);
#endif
lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
return NULL;
Expand Down
4 changes: 3 additions & 1 deletion src/msg/Messenger.h
Expand Up @@ -151,12 +151,14 @@ class Messenger {
* @param name entity name to register
* @param lname logical name of the messenger in this process (e.g., "client")
* @param nonce nonce value to uniquely identify this instance on the current host
* @param features bits for the local connection
*/
static Messenger *create(CephContext *cct,
const string &type,
entity_name_t name,
string lname,
uint64_t nonce);
uint64_t nonce,
uint64_t features = 0);

/**
* @defgroup Accessors
Expand Down
3 changes: 2 additions & 1 deletion src/msg/async/AsyncMessenger.cc
Expand Up @@ -381,7 +381,7 @@ void WorkerPool::barrier()
*/

AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce)
string mname, uint64_t _nonce, uint64_t features)
: SimplePolicyMessenger(cct, name,mname, _nonce),
processor(this, cct, _nonce),
lock("AsyncMessenger::lock"),
Expand All @@ -393,6 +393,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
cct->lookup_or_create_singleton_object<WorkerPool>(pool, WorkerPool::name);
Worker *w = pool->get_worker();
local_connection = new AsyncConnection(cct, this, &w->center, w->get_perf_counter());
local_features = features;
init_local_connection();
}

Expand Down
4 changes: 3 additions & 1 deletion src/msg/async/AsyncMessenger.h
Expand Up @@ -174,7 +174,7 @@ class AsyncMessenger : public SimplePolicyMessenger {
* be a value that will be repeated if the daemon restarts.
*/
AsyncMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce);
string mname, uint64_t _nonce, uint64_t features);

/**
* Destroy the AsyncMessenger. Pretty simple since all the work is done
Expand Down Expand Up @@ -396,13 +396,15 @@ class AsyncMessenger : public SimplePolicyMessenger {
assert(lock.is_locked());
local_connection->peer_addr = my_inst.addr;
local_connection->peer_type = my_inst.name.type();
local_connection->set_features(local_features);
ms_deliver_handle_fast_connect(local_connection.get());
}

public:

/// con used for sending messages to ourselves
ConnectionRef local_connection;
uint64_t local_features;

/**
* @defgroup AsyncMessenger internals
Expand Down
4 changes: 3 additions & 1 deletion src/msg/simple/SimpleMessenger.cc
Expand Up @@ -38,7 +38,7 @@ static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) {
*/

SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce)
string mname, uint64_t _nonce, uint64_t features)
: SimplePolicyMessenger(cct, name,mname, _nonce),
accepter(this, _nonce),
dispatch_queue(cct, this),
Expand All @@ -54,6 +54,7 @@ SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
local_connection(new PipeConnection(cct, this))
{
ceph_spin_init(&global_seq_lock);
local_features = features;
init_local_connection();
}

Expand Down Expand Up @@ -710,5 +711,6 @@ void SimpleMessenger::init_local_connection()
{
local_connection->peer_addr = my_inst.addr;
local_connection->peer_type = my_inst.name.type();
local_connection->set_features(local_features);
ms_deliver_handle_fast_connect(local_connection.get());
}
6 changes: 4 additions & 2 deletions src/msg/simple/SimpleMessenger.h
Expand Up @@ -79,9 +79,10 @@ class SimpleMessenger : public SimplePolicyMessenger {
* @param name The name to assign ourselves
* _nonce A unique ID to use for this SimpleMessenger. It should not
* be a value that will be repeated if the daemon restarts.
* features The local features bits for the local_connection
*/
SimpleMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce);
string mname, uint64_t _nonce, uint64_t features);

/**
* Destroy the SimpleMessenger. Pretty simple since all the work is done
Expand Down Expand Up @@ -331,6 +332,7 @@ class SimpleMessenger : public SimplePolicyMessenger {

/// con used for sending messages to ourselves
ConnectionRef local_connection;
uint64_t local_features;

/**
* @defgroup SimpleMessenger internals
Expand Down Expand Up @@ -369,7 +371,7 @@ class SimpleMessenger : public SimplePolicyMessenger {
int get_proto_version(int peer_type, bool connect);

/**
* Fill in the address and peer type for the local connection, which
* Fill in the features, address and peer type for the local connection, which
* is used for delivering messages back to ourself.
*/
void init_local_connection();
Expand Down
4 changes: 3 additions & 1 deletion src/msg/xio/XioMessenger.cc
Expand Up @@ -251,7 +251,7 @@ static string xio_uri_from_entity(const string &type,
/* XioMessenger */
XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce,
DispatchStrategy *ds)
DispatchStrategy *ds, uint64_t features)
: SimplePolicyMessenger(cct, name, mname, _nonce),
nsessions(0),
shutdown_called(false),
Expand Down Expand Up @@ -379,6 +379,8 @@ XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
/* update class instance count */
nInstances.inc();

loop_con.set_features(features);

} /* ctor */

int XioMessenger::pool_hint(uint32_t dsize) {
Expand Down
2 changes: 1 addition & 1 deletion src/msg/xio/XioMessenger.h
Expand Up @@ -54,7 +54,7 @@ class XioMessenger : public SimplePolicyMessenger
public:
XioMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t nonce,
DispatchStrategy* ds = new QueueStrategy(1));
DispatchStrategy* ds = new QueueStrategy(1), uint64_t features);

virtual ~XioMessenger();

Expand Down

0 comments on commit 626360a

Please sign in to comment.