Skip to content

Commit

Permalink
msg/simple: mark daemons down on RST + ECONNREFUSED
Browse files Browse the repository at this point in the history
When a daemon goes down (because it is killed or suicided because of assert)
on otherwise healthy machine, TCP stack will take care of its connections
and send RST (connection reset) packets to all daemons that were connected
to downed daemon. OSDs already handle that and attempt to reconnect, with
grace timer starting to count. When the grace timer runs out and daemons
still cannot reconnect, OSD is marked down.
This changeset adds additional handler (handle_refused()) to the dispatchers
and code that detects when connection attempt fails with ECONNREFUSED error
(connection refused) which is a clear indication that host is alive, but
daemon isn't, so daemons can instantly mark the other side as undoubtly
downed without the need for grace timer.
This changeset also adds more info to connections so figuring out which OSD
actually failed is a bit easier.
In current state, only OSDs take advantage of handle_refused() facility,
but I don't see why other daemons shouldn't.

Signed-off-by: Piotr Dałek <piotr.dalek@ts.fujitsu.com>
  • Loading branch information
Piotr Dałek authored and branch-predictor committed May 5, 2016
1 parent 64da723 commit b2765f4
Show file tree
Hide file tree
Showing 32 changed files with 172 additions and 17 deletions.
6 changes: 6 additions & 0 deletions src/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11931,6 +11931,12 @@ void Client::ms_handle_remote_reset(Connection *con)
}
}

bool Client::ms_handle_refused(Connection *con)
{
ldout(cct, 1) << "ms_handle_refused on " << con->get_peer_addr() << dendl;
return false;
}

bool Client::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
{
if (dest_type == CEPH_ENTITY_TYPE_MON)
Expand Down
1 change: 1 addition & 0 deletions src/client/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ class Client : public Dispatcher, public md_config_obs_t {
void ms_handle_connect(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
bool ms_handle_refused(Connection *con);
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);

int authenticate();
Expand Down
4 changes: 4 additions & 0 deletions src/librados/RadosClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,10 @@ void librados::RadosClient::ms_handle_remote_reset(Connection *con)
{
}

bool librados::RadosClient::ms_handle_refused(Connection *con)
{
return false;
}

bool librados::RadosClient::_dispatch(Message *m)
{
Expand Down
1 change: 1 addition & 0 deletions src/librados/RadosClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class librados::RadosClient : public Dispatcher
void ms_handle_connect(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
bool ms_handle_refused(Connection *con);

Objecter *objecter;

Expand Down
1 change: 1 addition & 0 deletions src/mds/Beacon.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class Beacon : public Dispatcher
void ms_handle_connect(Connection *c) {}
bool ms_handle_reset(Connection *c) {return false;}
void ms_handle_remote_reset(Connection *c) {}
bool ms_handle_refused(Connection *c) {return false;}

void notify_mdsmap(MDSMap const *mdsmap);
void notify_health(MDSRank const *mds);
Expand Down
6 changes: 6 additions & 0 deletions src/mds/MDSDaemon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1279,6 +1279,12 @@ void MDSDaemon::ms_handle_remote_reset(Connection *con)
}
}

bool MDSDaemon::ms_handle_refused(Connection *con)
{
// do nothing for now
return false;
}

bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type,
int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
bool& is_valid, CryptoKey& session_key)
Expand Down
1 change: 1 addition & 0 deletions src/mds/MDSDaemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class MDSDaemon : public Dispatcher, public md_config_obs_t {
void ms_handle_connect(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
bool ms_handle_refused(Connection *con);

protected:
// admin socket handling
Expand Down
4 changes: 4 additions & 0 deletions src/mon/MonClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ struct MonClientPinger : public Dispatcher {
return true;
}
void ms_handle_remote_reset(Connection *con) {}
bool ms_handle_refused(Connection *con) {
return false;
}
};

class MonClient : public Dispatcher {
Expand Down Expand Up @@ -145,6 +148,7 @@ class MonClient : public Dispatcher {
bool ms_dispatch(Message *m);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con) {}
bool ms_handle_refused(Connection *con) { return false; }

void handle_monmap(MMonMap *m);

Expand Down
8 changes: 8 additions & 0 deletions src/mon/Monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3163,6 +3163,7 @@ void Monitor::handle_forward(MonOpRequestRef op)
c->set_priv(s->get());
c->set_peer_addr(m->client.addr);
c->set_peer_type(m->client.name.type());
c->set_peer_id(m->client.name.num());
c->set_features(m->con_features);

s->caps = m->client_caps;
Expand Down Expand Up @@ -4355,6 +4356,13 @@ bool Monitor::ms_handle_reset(Connection *con)
return true;
}

bool Monitor::ms_handle_refused(Connection *con)
{
// just log for now...
dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
return false;
}

void Monitor::check_subs()
{
string type = "monmap";
Expand Down
1 change: 1 addition & 0 deletions src/mon/Monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ class Monitor : public Dispatcher,
bool& isvalid, CryptoKey& session_key);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con) {}
bool ms_handle_refused(Connection *con);

int write_default_keyring(bufferlist& bl);
void extract_save_mon_key(KeyRing& keyring);
Expand Down
3 changes: 3 additions & 0 deletions src/msg/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct Connection : public RefCountedObject {
Messenger *msgr;
RefCountedObject *priv;
int peer_type;
int64_t peer_id;
entity_addr_t peer_addr;
utime_t last_keepalive, last_keepalive_ack;
private:
Expand Down Expand Up @@ -153,6 +154,8 @@ struct Connection : public RefCountedObject {

int get_peer_type() const { return peer_type; }
void set_peer_type(int t) { peer_type = t; }
int64_t get_peer_id() const { return peer_id; }
void set_peer_id(int64_t id) { peer_id = id; }

bool peer_is_mon() const { return peer_type == CEPH_ENTITY_TYPE_MON; }
bool peer_is_mds() const { return peer_type == CEPH_ENTITY_TYPE_MDS; }
Expand Down
10 changes: 10 additions & 0 deletions src/msg/Dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@ class Dispatcher {
*/
virtual void ms_handle_remote_reset(Connection *con) = 0;

/**
* This indicates that the connection is both broken and further
* connection attempts are failing because other side refuses
* it.
*
* @param con The Connection which broke. You are not granted
* a reference to it.
*/
virtual bool ms_handle_refused(Connection *con) = 0;

/**
* @defgroup Authentication
* @{
Expand Down
18 changes: 18 additions & 0 deletions src/msg/Messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,24 @@ class Messenger {
++p)
(*p)->ms_handle_remote_reset(con);
}

/**
* Notify each Dispatcher of a Connection for which reconnection
* attempts are being refused. Call this function whenever you
* detect that a lossy Connection has been disconnected and it's
* impossible to reconnect.
*
* @param con Pointer to the broken Connection.
*/
void ms_deliver_handle_refused(Connection *con) {
for (list<Dispatcher*>::iterator p = dispatchers.begin();
p != dispatchers.end();
++p) {
if ((*p)->ms_handle_refused(con))
return;
}
}

/**
* Get the AuthAuthorizer for a new outgoing Connection.
*
Expand Down
9 changes: 6 additions & 3 deletions src/msg/simple/DispatchQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,12 @@ void DispatchQueue::entry()
case D_ACCEPT:
msgr->ms_deliver_handle_accept(qitem.get_connection());
break;
case D_BAD_RESET:
msgr->ms_deliver_handle_reset(qitem.get_connection());
break;
case D_BAD_RESET:
msgr->ms_deliver_handle_reset(qitem.get_connection());
break;
case D_CONN_REFUSED:
msgr->ms_deliver_handle_refused(qitem.get_connection());
break;
default:
assert(0);
}
Expand Down
13 changes: 12 additions & 1 deletion src/msg/simple/DispatchQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class DispatchQueue {

uint64_t next_pipe_id;

enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_NUM_CODES };
enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_CONN_REFUSED, D_NUM_CODES };

/**
* The DispatchThread runs dispatch_entry to empty out the dispatch_queue.
Expand Down Expand Up @@ -175,6 +175,17 @@ class DispatchQueue {
cond.Signal();
}

void queue_refused(Connection *con) {
Mutex::Locker l(lock);
if (stop)
return;
mqueue.enqueue_strict(
0,
CEPH_MSG_PRIO_HIGHEST,
QueueItem(D_CONN_REFUSED, con));
cond.Signal();
}

bool can_fast_dispatch(Message *m) const;
void fast_dispatch(Message *m);
void fast_preprocess(Message *m);
Expand Down
11 changes: 8 additions & 3 deletions src/msg/simple/Pipe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -914,9 +914,14 @@ int Pipe::connect()
ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
rc = ::connect(sd, (sockaddr*)&peer_addr.addr, peer_addr.addr_size());
if (rc < 0) {
rc = -errno;
int stored_errno = errno;
ldout(msgr->cct,2) << "connect error " << peer_addr
<< ", " << cpp_strerror(rc) << dendl;
<< ", " << cpp_strerror(stored_errno) << dendl;
if (stored_errno == ECONNREFUSED) {
ldout(msgr->cct, 10) << "connection refused!" << dendl;
msgr->dispatch_queue.queue_refused(connection_state.get());
}

goto fail;
}

Expand Down Expand Up @@ -1346,7 +1351,7 @@ void Pipe::fault(bool onread)
ldout(msgr->cct,10) << "fault already connecting, reader shutting down" << dendl;
return;
}

ldout(msgr->cct,2) << "fault " << cpp_strerror(errno) << dendl;

if (state == STATE_CLOSED ||
Expand Down
6 changes: 6 additions & 0 deletions src/msg/simple/Pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
public:
int port;
int peer_type;
int64_t peer_id;
entity_addr_t peer_addr;
Messenger::Policy policy;

Expand Down Expand Up @@ -291,6 +292,11 @@ static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
connection_state->set_peer_type(t);
}

void set_peer_id(int64_t t) {
peer_id = t;
connection_state->set_peer_id(t);
}

void register_pipe();
void unregister_pipe();
void join();
Expand Down
15 changes: 8 additions & 7 deletions src/msg/simple/SimpleMessenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ int SimpleMessenger::_send_message(Message *m, const entity_inst_t& dest)
lock.Lock();
Pipe *pipe = _lookup_pipe(dest.addr);
submit_message(m, (pipe ? pipe->connection_state.get() : NULL),
dest.addr, dest.name.type(), true);
dest.addr, dest.name.type(), dest.name.num(), true);
lock.Unlock();
return 0;
}
Expand All @@ -139,7 +139,7 @@ int SimpleMessenger::_send_message(Message *m, Connection *con)
<< dendl;

submit_message(m, static_cast<PipeConnection*>(con),
con->get_peer_addr(), con->get_peer_type(), false);
con->get_peer_addr(), con->get_peer_type(), con->get_peer_id(), false);
return 0;
}

Expand Down Expand Up @@ -355,7 +355,7 @@ Pipe *SimpleMessenger::add_accept_pipe(int sd)
* NOTE: assumes messenger.lock held.
*/
Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
int type,
int type, int64_t num,
PipeConnection *con,
Message *first)
{
Expand All @@ -369,6 +369,7 @@ Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
static_cast<PipeConnection*>(con));
pipe->pipe_lock.Lock();
pipe->set_peer_type(type);
pipe->set_peer_id(num);
pipe->set_peer_addr(addr);
pipe->policy = get_policy(type);
pipe->start_writer();
Expand Down Expand Up @@ -412,7 +413,7 @@ ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest)
if (pipe) {
ldout(cct, 10) << "get_connection " << dest << " existing " << pipe << dendl;
} else {
pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL);
pipe = connect_rank(dest.addr, dest.name.type(), dest.name.num(), NULL, NULL);
ldout(cct, 10) << "get_connection " << dest << " new " << pipe << dendl;
}
Mutex::Locker l(pipe->pipe_lock);
Expand All @@ -428,7 +429,7 @@ ConnectionRef SimpleMessenger::get_loopback_connection()
}

void SimpleMessenger::submit_message(Message *m, PipeConnection *con,
const entity_addr_t& dest_addr, int dest_type,
const entity_addr_t& dest_addr, int dest_type, int64_t dest_num,
bool already_locked)
{
if (cct->_conf->ms_dump_on_send) {
Expand Down Expand Up @@ -501,9 +502,9 @@ void SimpleMessenger::submit_message(Message *m, PipeConnection *con,
* grab the lock and do it again. If we got here, we know it's a non-lossy
* Connection, so we can use our existing pointer without doing another lookup. */
Mutex::Locker l(lock);
submit_message(m, con, dest_addr, dest_type, true);
submit_message(m, con, dest_addr, dest_type, dest_num, true);
} else {
connect_rank(dest_addr, dest_type, static_cast<PipeConnection*>(con), m);
connect_rank(dest_addr, dest_type, dest_num, static_cast<PipeConnection*>(con), m);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/msg/simple/SimpleMessenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ class SimpleMessenger : public SimplePolicyMessenger {
* @return a pointer to the newly-created Pipe. Caller does not own a
* reference; take one if you need it.
*/
Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con,
Message *first);
Pipe *connect_rank(const entity_addr_t& addr, int type, int64_t num,
PipeConnection *con, Message *first);
/**
* Send a message, lazily or not.
* This just glues send_message together and passes
Expand Down Expand Up @@ -250,7 +250,7 @@ class SimpleMessenger : public SimplePolicyMessenger {
* without locking, you MUST have filled in the con with a valid pointer.
*/
void submit_message(Message *m, PipeConnection *con,
const entity_addr_t& addr, int dest_type,
const entity_addr_t& addr, int dest_type, int64_t dest_num,
bool already_locked);
/**
* Look through the pipes in the pipe_reap_queue and tear them down.
Expand Down
24 changes: 24 additions & 0 deletions src/osd/OSD.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4698,6 +4698,30 @@ struct C_OSD_GetVersion : public Context {
}
};

bool OSD::ms_handle_refused(Connection *con)
{
OSD::Session *session = (OSD::Session *)con->get_priv();
dout(1) << "ms_handle_refused con " << con << " session " << session << dendl;
if (!session)
return false;
int id = con->get_peer_id();
int type = con->get_peer_type();
// handle only OSD failures here
if (monc && (type == CEPH_ENTITY_TYPE_OSD)) {
OSDMapRef osdmap = get_osdmap();
if (osdmap && osdmap->is_up(id)) {
// I'm cheating mon heartbeat grace logic, because we know it's not going
// to respawn alone. +1 so we won't hit any boundary case.
monc->send_mon_message(new MOSDFailure(monc->get_fsid(),
osdmap->get_inst(id),
cct->_conf->osd_heartbeat_grace + 1,
osdmap->get_epoch()
));
}
}
return true;
}

void OSD::start_boot()
{
if (!_is_healthy()) {
Expand Down
4 changes: 4 additions & 0 deletions src/osd/OSD.h
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,9 @@ class OSD : public Dispatcher,
bool ms_handle_reset(Connection *con) {
return osd->heartbeat_reset(con);
}
bool ms_handle_refused(Connection *con) {
return osd->ms_handle_refused(con);
}
void ms_handle_remote_reset(Connection *con) {}
bool ms_verify_authorizer(Connection *con, int peer_type,
int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
Expand Down Expand Up @@ -2349,6 +2352,7 @@ class OSD : public Dispatcher,
void ms_handle_fast_accept(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con) {}
bool ms_handle_refused(Connection *con);

io_queue get_io_queue() const {
if (cct->_conf->osd_op_queue == "debug_random") {
Expand Down

0 comments on commit b2765f4

Please sign in to comment.