Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

msg: mark daemons down on RST + ECONNREFUSED #8558

Merged
merged 6 commits into from
Sep 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12380,6 +12380,12 @@ void Client::ms_handle_remote_reset(Connection *con)
}
}

bool Client::ms_handle_refused(Connection *con)
{
ldout(cct, 1) << "ms_handle_refused on " << con->get_peer_addr() << dendl;
return false;
}

bool Client::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
{
if (dest_type == CEPH_ENTITY_TYPE_MON)
Expand Down
1 change: 1 addition & 0 deletions src/client/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ class Client : public Dispatcher, public md_config_obs_t {
void ms_handle_connect(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
bool ms_handle_refused(Connection *con);
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);

int authenticate();
Expand Down
1 change: 1 addition & 0 deletions src/common/config_opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ OPTION(osd_op_history_duration, OPT_U32, 600) // Oldest completed op to track
OPTION(osd_target_transaction_size, OPT_INT, 30) // to adjust various transactions that batch smaller items
OPTION(osd_failsafe_full_ratio, OPT_FLOAT, .97) // what % full makes an OSD "full" (failsafe)
OPTION(osd_failsafe_nearfull_ratio, OPT_FLOAT, .90) // what % full makes an OSD near full (failsafe)
OPTION(osd_fast_fail_on_connection_refused, OPT_BOOL, true) // immediately mark OSDs as down once they refuse to accept connections

OPTION(osd_pg_object_context_cache_count, OPT_INT, 64)
OPTION(osd_tracing, OPT_BOOL, false) // true if LTTng-UST tracepoints should be enabled
Expand Down
4 changes: 4 additions & 0 deletions src/librados/RadosClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,10 @@ void librados::RadosClient::ms_handle_remote_reset(Connection *con)
{
}

bool librados::RadosClient::ms_handle_refused(Connection *con)
{
return false;
}

bool librados::RadosClient::_dispatch(Message *m)
{
Expand Down
1 change: 1 addition & 0 deletions src/librados/RadosClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class librados::RadosClient : public Dispatcher
void ms_handle_connect(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
bool ms_handle_refused(Connection *con);

Objecter *objecter;

Expand Down
1 change: 1 addition & 0 deletions src/mds/Beacon.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class Beacon : public Dispatcher
void ms_handle_connect(Connection *c) {}
bool ms_handle_reset(Connection *c) {return false;}
void ms_handle_remote_reset(Connection *c) {}
bool ms_handle_refused(Connection *c) {return false;}

void notify_mdsmap(MDSMap const *mdsmap);
void notify_health(MDSRank const *mds);
Expand Down
6 changes: 6 additions & 0 deletions src/mds/MDSDaemon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,12 @@ void MDSDaemon::ms_handle_remote_reset(Connection *con)
}
}

bool MDSDaemon::ms_handle_refused(Connection *con)
{
// do nothing for now
return false;
}

bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type,
int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
bool& is_valid, CryptoKey& session_key)
Expand Down
1 change: 1 addition & 0 deletions src/mds/MDSDaemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class MDSDaemon : public Dispatcher, public md_config_obs_t {
void ms_handle_connect(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
bool ms_handle_refused(Connection *con);

protected:
// admin socket handling
Expand Down
34 changes: 27 additions & 7 deletions src/messages/MOSDFailure.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,41 @@ class MOSDFailure : public PaxosServiceMessage {
static const int HEAD_VERSION = 3;

public:
enum {
FLAG_ALIVE = 0, // use this on its own to mark as "I'm still alive"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave this out..

FLAG_FAILED = 1, // if set, failure; if not, recovery
FLAG_IMMEDIATE = 2, // known failure, not a timeout
};

uuid_d fsid;
entity_inst_t target_osd;
__u8 is_failed;
__u8 flags;
epoch_t epoch;
int32_t failed_for; // known to be failed since at least this long

MOSDFailure() : PaxosServiceMessage(MSG_OSD_FAILURE, 0, HEAD_VERSION) { }
MOSDFailure(const uuid_d &fs, const entity_inst_t& f, int duration, epoch_t e)
: PaxosServiceMessage(MSG_OSD_FAILURE, e, HEAD_VERSION),
fsid(fs), target_osd(f), is_failed(true), epoch(e), failed_for(duration) { }
fsid(fs), target_osd(f),
flags(FLAG_FAILED),
epoch(e), failed_for(duration) { }
MOSDFailure(const uuid_d &fs, const entity_inst_t& f, int duration,
epoch_t e, __u8 extra_flags)
: PaxosServiceMessage(MSG_OSD_FAILURE, e, HEAD_VERSION),
fsid(fs), target_osd(f),
flags(extra_flags),
epoch(e), failed_for(duration) { }
private:
~MOSDFailure() {}

public:
entity_inst_t get_target() { return target_osd; }
bool if_osd_failed() { return is_failed; }
bool if_osd_failed() const {
return flags & FLAG_FAILED;
}
bool is_immediate() const {
return flags & FLAG_IMMEDIATE;
}
epoch_t get_epoch() { return epoch; }

void decode_payload() {
Expand All @@ -49,9 +68,9 @@ class MOSDFailure : public PaxosServiceMessage {
::decode(target_osd, p);
::decode(epoch, p);
if (header.version >= 2)
::decode(is_failed, p);
::decode(flags, p);
else
is_failed = true;
flags = FLAG_FAILED;
if (header.version >= 3)
::decode(failed_for, p);
else
Expand All @@ -63,14 +82,15 @@ class MOSDFailure : public PaxosServiceMessage {
::encode(fsid, payload);
::encode(target_osd, payload, features);
::encode(epoch, payload);
::encode(is_failed, payload);
::encode(flags, payload);
::encode(failed_for, payload);
}

const char *get_type_name() const { return "osd_failure"; }
void print(ostream& out) const {
out << "osd_failure("
<< (is_failed ? "failed " : "recovered ")
<< (if_osd_failed() ? "failed " : "recovered ")
<< (is_immediate() ? "immediate " : "timeout ")
<< target_osd << " for " << failed_for << "sec e" << epoch
<< " v" << version << ")";
}
Expand Down
4 changes: 4 additions & 0 deletions src/mon/MonClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ struct MonClientPinger : public Dispatcher {
return true;
}
void ms_handle_remote_reset(Connection *con) {}
bool ms_handle_refused(Connection *con) {
return false;
}
};

class MonClient : public Dispatcher {
Expand Down Expand Up @@ -140,6 +143,7 @@ class MonClient : public Dispatcher {
bool ms_dispatch(Message *m);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con) {}
bool ms_handle_refused(Connection *con) { return false; }

void handle_monmap(MMonMap *m);

Expand Down
7 changes: 7 additions & 0 deletions src/mon/Monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4398,6 +4398,13 @@ bool Monitor::ms_handle_reset(Connection *con)
return true;
}

bool Monitor::ms_handle_refused(Connection *con)
{
// just log for now...
dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
return false;
}

void Monitor::check_subs()
{
string type = "monmap";
Expand Down
1 change: 1 addition & 0 deletions src/mon/Monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,7 @@ class Monitor : public Dispatcher,
bool& isvalid, CryptoKey& session_key);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con) {}
bool ms_handle_refused(Connection *con);

int write_default_keyring(bufferlist& bl);
void extract_save_mon_key(KeyRing& keyring);
Expand Down
25 changes: 24 additions & 1 deletion src/mon/OSDMonitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1820,6 +1820,22 @@ bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)
return false;
}

void OSDMonitor::force_failure(utime_t now, int target_osd)
{
// already pending failure?
if (pending_inc.new_state.count(target_osd) &&
pending_inc.new_state[target_osd] & CEPH_OSD_UP) {
dout(10) << " already pending failure" << dendl;
return;
}

dout(1) << " we're forcing failure of osd." << target_osd << dendl;
pending_inc.new_state[target_osd] = CEPH_OSD_UP;

mon->clog->info() << osdmap.get_inst(target_osd) << " failed (forced)\n";
return;
}

bool OSDMonitor::prepare_failure(MonOpRequestRef op)
{
op->mark_osdmon_event(__func__);
Expand All @@ -1841,8 +1857,15 @@ bool OSDMonitor::prepare_failure(MonOpRequestRef op)

if (m->if_osd_failed()) {
// add a report
if (m->is_immediate()) {
mon->clog->debug() << m->get_target() << " reported immediately failed by "
<< m->get_orig_source_inst() << "\n";
force_failure(now, target_osd);
return true;
}
mon->clog->debug() << m->get_target() << " reported failed by "
<< m->get_orig_source_inst() << "\n";
<< m->get_orig_source_inst() << "\n";

failure_info_t& fi = failure_info[target_osd];
MonOpRequestRef old_op = fi.add_report(reporter, failed_since, op);
if (old_op) {
Expand Down
1 change: 1 addition & 0 deletions src/mon/OSDMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class OSDMonitor : public PaxosService {

bool check_failures(utime_t now);
bool check_failure(utime_t now, int target_osd, failure_info_t& fi);
void force_failure(utime_t now, int target_osd);

// map thrashing
int thrash_map;
Expand Down
3 changes: 3 additions & 0 deletions src/msg/DispatchQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ void DispatchQueue::entry()
case D_BAD_RESET:
msgr->ms_deliver_handle_reset(qitem.get_connection());
break;
case D_CONN_REFUSED:
msgr->ms_deliver_handle_refused(qitem.get_connection());
break;
default:
assert(0);
}
Expand Down
12 changes: 11 additions & 1 deletion src/msg/DispatchQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class DispatchQueue {

uint64_t next_id;

enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_NUM_CODES };
enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_CONN_REFUSED, D_NUM_CODES };

/**
* The DispatchThread runs dispatch_entry to empty out the dispatch_queue.
Expand Down Expand Up @@ -184,6 +184,16 @@ class DispatchQueue {
QueueItem(D_BAD_RESET, con));
cond.Signal();
}
void queue_refused(Connection *con) {
Mutex::Locker l(lock);
if (stop)
return;
mqueue.enqueue_strict(
0,
CEPH_MSG_PRIO_HIGHEST,
QueueItem(D_CONN_REFUSED, con));
cond.Signal();
}

bool can_fast_dispatch(Message *m) const;
void fast_dispatch(Message *m);
Expand Down
10 changes: 10 additions & 0 deletions src/msg/Dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@ class Dispatcher {
*/
virtual void ms_handle_remote_reset(Connection *con) = 0;

/**
* This indicates that the connection is both broken and further
* connection attempts are failing because other side refuses
* it.
*
* @param con The Connection which broke. You are not granted
* a reference to it.
*/
virtual bool ms_handle_refused(Connection *con) = 0;

/**
* @defgroup Authentication
* @{
Expand Down
18 changes: 18 additions & 0 deletions src/msg/Messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,24 @@ class Messenger {
++p)
(*p)->ms_handle_remote_reset(con);
}

/**
* Notify each Dispatcher of a Connection for which reconnection
* attempts are being refused. Call this function whenever you
* detect that a lossy Connection has been disconnected and it's
* impossible to reconnect.
*
* @param con Pointer to the broken Connection.
*/
void ms_deliver_handle_refused(Connection *con) {
for (list<Dispatcher*>::iterator p = dispatchers.begin();
p != dispatchers.end();
++p) {
if ((*p)->ms_handle_refused(con))
return;
}
}

/**
* Get the AuthAuthorizer for a new outgoing Connection.
*
Expand Down
4 changes: 4 additions & 0 deletions src/msg/async/AsyncConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,10 @@ ssize_t AsyncConnection::_process_connection()
r = cs.is_connected();
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl;
if (r == -ECONNREFUSED) {
ldout(async_msgr->cct, 2) << __func__ << " connection refused!" << dendl;
dispatch_queue->queue_refused(this);
}
goto fail;
} else if (r == 0) {
ldout(async_msgr->cct, 10) << __func__ << " nonblock connect inprogress" << dendl;
Expand Down
8 changes: 6 additions & 2 deletions src/msg/simple/Pipe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -924,9 +924,13 @@ int Pipe::connect()
ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());
if (rc < 0) {
rc = -errno;
int stored_errno = errno;
ldout(msgr->cct,2) << "connect error " << peer_addr
<< ", " << cpp_strerror(rc) << dendl;
<< ", " << cpp_strerror(stored_errno) << dendl;
if (stored_errno == ECONNREFUSED) {
ldout(msgr->cct, 2) << "connection refused!" << dendl;
msgr->dispatch_queue.queue_refused(connection_state.get());
}
goto fail;
}

Expand Down
34 changes: 32 additions & 2 deletions src/osd/OSD.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4776,6 +4776,37 @@ bool OSD::ms_handle_reset(Connection *con)
return true;
}

bool OSD::ms_handle_refused(Connection *con)
{
if (!cct->_conf->osd_fast_fail_on_connection_refused)
return false;

OSD::Session *session = (OSD::Session *)con->get_priv();
dout(1) << "ms_handle_refused con " << con << " session " << session << dendl;
if (!session)
return false;
int type = con->get_peer_type();
// handle only OSD failures here
if (monc && (type == CEPH_ENTITY_TYPE_OSD)) {
OSDMapRef osdmap = get_osdmap();
if (osdmap) {
int id = osdmap->identify_osd_on_all_channels(con->get_peer_addr());
if (id >= 0 && osdmap->is_up(id)) {
// I'm cheating mon heartbeat grace logic, because we know it's not going
// to respawn alone. +1 so we won't hit any boundary case.
monc->send_mon_message(new MOSDFailure(monc->get_fsid(),
osdmap->get_inst(id),
cct->_conf->osd_heartbeat_grace + 1,
osdmap->get_epoch(),
MOSDFailure::FLAG_IMMEDIATE | MOSDFailure::FLAG_FAILED
));
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

session leak, session->put()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

session->put();
return true;
}

struct C_OSD_GetVersion : public Context {
OSD *osd;
uint64_t oldest, newest;
Expand Down Expand Up @@ -5085,8 +5116,7 @@ void OSD::send_failures()

void OSD::send_still_alive(epoch_t epoch, const entity_inst_t &i)
{
MOSDFailure *m = new MOSDFailure(monc->get_fsid(), i, 0, epoch);
m->is_failed = false;
MOSDFailure *m = new MOSDFailure(monc->get_fsid(), i, 0, epoch, MOSDFailure::FLAG_ALIVE);
monc->send_mon_message(m);
}

Expand Down