Skip to content

Commit

Permalink
msg/connection: store entity number
Browse files Browse the repository at this point in the history
Store peer_id within instance of Connection object, so we can easily see
to what entity that connection was connected.

Signed-off-by: Piotr Dałek <git@predictor.org.pl>
  • Loading branch information
branch-predictor committed Aug 20, 2016
1 parent 42a42c2 commit 6841eff
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/mon/Monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3207,6 +3207,7 @@ void Monitor::handle_forward(MonOpRequestRef op)
c->set_priv(s->get());
c->set_peer_addr(m->client.addr);
c->set_peer_type(m->client.name.type());
c->set_peer_id(m->client.name.num());
c->set_features(m->con_features);

s->caps = m->client_caps;
Expand Down
3 changes: 3 additions & 0 deletions src/msg/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct Connection : public RefCountedObject {
Messenger *msgr;
RefCountedObject *priv;
int peer_type;
int64_t peer_id;
entity_addr_t peer_addr;
utime_t last_keepalive, last_keepalive_ack;
private:
Expand Down Expand Up @@ -153,6 +154,8 @@ struct Connection : public RefCountedObject {

int get_peer_type() const { return peer_type; }
void set_peer_type(int t) { peer_type = t; }
int64_t get_peer_id() const { return peer_id; }
void set_peer_id(int64_t id) { peer_id = id; }

bool peer_is_mon() const { return peer_type == CEPH_ENTITY_TYPE_MON; }
bool peer_is_mds() const { return peer_type == CEPH_ENTITY_TYPE_MDS; }
Expand Down
5 changes: 5 additions & 0 deletions src/msg/simple/Pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
public:
int port;
int peer_type;
int64_t peer_id;
entity_addr_t peer_addr;
Messenger::Policy policy;

Expand Down Expand Up @@ -290,6 +291,10 @@ static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
peer_type = t;
connection_state->set_peer_type(t);
}
void set_peer_id(int64_t t) {
peer_id = t;
connection_state->set_peer_id(t);
}

void register_pipe();
void unregister_pipe();
Expand Down
15 changes: 8 additions & 7 deletions src/msg/simple/SimpleMessenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ int SimpleMessenger::_send_message(Message *m, const entity_inst_t& dest)
lock.Lock();
Pipe *pipe = _lookup_pipe(dest.addr);
submit_message(m, (pipe ? pipe->connection_state.get() : NULL),
dest.addr, dest.name.type(), true);
dest.addr, dest.name.type(), dest.name.num(), true);
lock.Unlock();
return 0;
}
Expand All @@ -137,7 +137,7 @@ int SimpleMessenger::_send_message(Message *m, Connection *con)
<< dendl;

submit_message(m, static_cast<PipeConnection*>(con),
con->get_peer_addr(), con->get_peer_type(), false);
con->get_peer_addr(), con->get_peer_type(), con->get_peer_id(), false);
return 0;
}

Expand Down Expand Up @@ -343,7 +343,7 @@ Pipe *SimpleMessenger::add_accept_pipe(int sd)
* NOTE: assumes messenger.lock held.
*/
Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
int type,
int type, int64_t num,
PipeConnection *con,
Message *first)
{
Expand All @@ -357,6 +357,7 @@ Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
static_cast<PipeConnection*>(con));
pipe->pipe_lock.Lock();
pipe->set_peer_type(type);
pipe->set_peer_id(num);
pipe->set_peer_addr(addr);
pipe->policy = get_policy(type);
pipe->start_writer();
Expand Down Expand Up @@ -400,7 +401,7 @@ ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest)
if (pipe) {
ldout(cct, 10) << "get_connection " << dest << " existing " << pipe << dendl;
} else {
pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL);
pipe = connect_rank(dest.addr, dest.name.type(), dest.name.num(), NULL, NULL);
ldout(cct, 10) << "get_connection " << dest << " new " << pipe << dendl;
}
Mutex::Locker l(pipe->pipe_lock);
Expand All @@ -416,7 +417,7 @@ ConnectionRef SimpleMessenger::get_loopback_connection()
}

void SimpleMessenger::submit_message(Message *m, PipeConnection *con,
const entity_addr_t& dest_addr, int dest_type,
const entity_addr_t& dest_addr, int dest_type, int64_t dest_num,
bool already_locked)
{
if (cct->_conf->ms_dump_on_send) {
Expand Down Expand Up @@ -490,9 +491,9 @@ void SimpleMessenger::submit_message(Message *m, PipeConnection *con,
* grab the lock and do it again. If we got here, we know it's a non-lossy
* Connection, so we can use our existing pointer without doing another lookup. */
Mutex::Locker l(lock);
submit_message(m, con, dest_addr, dest_type, true);
submit_message(m, con, dest_addr, dest_type, dest_num, true);
} else {
connect_rank(dest_addr, dest_type, static_cast<PipeConnection*>(con), m);
connect_rank(dest_addr, dest_type, dest_num, static_cast<PipeConnection*>(con), m);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/msg/simple/SimpleMessenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ class SimpleMessenger : public SimplePolicyMessenger {
* @return a pointer to the newly-created Pipe. Caller does not own a
* reference; take one if you need it.
*/
Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con,
Message *first);
Pipe *connect_rank(const entity_addr_t& addr, int type, int64_t num,
PipeConnection *con, Message *first);
/**
* Send a message, lazily or not.
* This just glues send_message together and passes
Expand Down Expand Up @@ -250,7 +250,7 @@ class SimpleMessenger : public SimplePolicyMessenger {
* without locking, you MUST have filled in the con with a valid pointer.
*/
void submit_message(Message *m, PipeConnection *con,
const entity_addr_t& addr, int dest_type,
const entity_addr_t& addr, int dest_type, int64_t dest_num,
bool already_locked);
/**
* Look through the pipes in the pipe_reap_queue and tear them down.
Expand Down

0 comments on commit 6841eff

Please sign in to comment.