Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

msg/async: harden error logic handle #9781

Merged
merged 5 commits into from Jun 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/msg/async/AsyncConnection.cc
Expand Up @@ -112,7 +112,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
EventCenter *c, PerfCounters *p)
: Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
logger(p), global_seq(0), connect_seq(0), peer_global_seq(0),
out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1), port(-1),
out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1),
dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
Expand Down Expand Up @@ -354,6 +354,9 @@ ssize_t AsyncConnection::_try_send(bool more)
if (open_write && !is_queued()) {
center->delete_file_event(sd, EVENT_WRITABLE);
open_write = false;

if (state_after_send != STATE_NONE)
center->dispatch_event_external(read_handler);
}

return outcoming_bl.length();
Expand Down Expand Up @@ -962,7 +965,7 @@ ssize_t AsyncConnection::_process_connection()
if (!outcoming_bl.length()) {
assert(state_after_send);
state = state_after_send;
state_after_send = 0;
state_after_send = STATE_NONE;
}
break;
}
Expand Down Expand Up @@ -2005,6 +2008,7 @@ void AsyncConnection::requeue_sent()
ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend "
<< " (" << m->get_seq() << ")" << dendl;
rq.push_front(make_pair(bufferlist(), m));
out_seq.dec();
}
}

Expand All @@ -2023,6 +2027,7 @@ void AsyncConnection::discard_requeued_up_to(uint64_t seq)
<< " <= " << seq << ", discarding" << dendl;
p.second->put();
rq.pop_front();
out_seq.inc();
}
if (rq.empty())
out_q.erase(CEPH_MSG_PRIO_HIGHEST);
Expand Down
4 changes: 2 additions & 2 deletions src/msg/async/AsyncConnection.h
Expand Up @@ -367,9 +367,9 @@ class AsyncConnection : public Connection {
void process();
void wakeup_from(uint64_t id);
void local_deliver();
void stop() {
void stop(bool queue_reset) {
lock.Lock();
bool need_queue_reset = (state != STATE_CLOSED);
bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
lock.Unlock();
mark_down();
if (need_queue_reset)
Expand Down
48 changes: 25 additions & 23 deletions src/msg/async/AsyncMessenger.cc
Expand Up @@ -277,13 +277,11 @@ int Processor::start(Worker *w)
void Processor::accept()
{
ldout(msgr->cct, 10) << __func__ << " listen_sd=" << listen_sd << dendl;
int errors = 0;
while (errors < 4) {
while (true) {
sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen);
if (sd >= 0) {
errors = 0;
ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << sd << dendl;

msgr->add_accept(sd);
Expand All @@ -293,10 +291,18 @@ void Processor::accept()
continue;
} else if (errno == EAGAIN) {
break;
} else if (errno == EMFILE || errno == ENFILE) {
lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << sd
<< " errno " << errno << " " << cpp_strerror(errno) << dendl;
break;
} else if (errno == ECONNABORTED) {
ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << sd
<< " errno " << errno << " " << cpp_strerror(errno) << dendl;
continue;
} else {
errors++;
ldout(msgr->cct, 20) << __func__ << " no incoming connection? sd = " << sd
<< " errno " << errno << " " << cpp_strerror(errno) << dendl;
lderr(msgr->cct) << __func__ << " no incoming connection? sd = " << sd
<< " errno " << errno << " " << cpp_strerror(errno) << dendl;
break;
}
}
}
Expand Down Expand Up @@ -571,16 +577,17 @@ int AsyncMessenger::shutdown()
{
ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;

// break ref cycles on the loopback connection
processor.stop();
mark_down_all();
dispatch_queue.shutdown();
// break ref cycles on the loopback connection
local_connection->set_priv(NULL);
pool->barrier();
// done! clean up.
processor.stop();
did_bind = false;
lock.Lock();
stop_cond.Signal();
lock.Unlock();
stopped = true;
lock.Unlock();
pool->barrier();
return 0;
}

Expand Down Expand Up @@ -653,22 +660,17 @@ void AsyncMessenger::wait()

lock.Unlock();

// done! clean up.
ldout(cct,20) << __func__ << ": stopping processor thread" << dendl;
processor.stop();
did_bind = false;
ldout(cct,20) << __func__ << ": stopped processor thread" << dendl;

// close all connections
mark_down_all();

dispatch_queue.shutdown();
if (dispatch_queue.is_started()) {
ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl;
dispatch_queue.wait();
dispatch_queue.discard_local();
ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl;
}

// close all connections
shutdown_connections(false);

ldout(cct, 10) << __func__ << ": done." << dendl;
ldout(cct, 1) << __func__ << " complete." << dendl;
started = false;
Expand Down Expand Up @@ -809,15 +811,15 @@ int AsyncMessenger::send_keepalive(Connection *con)
return 0;
}

void AsyncMessenger::mark_down_all()
void AsyncMessenger::shutdown_connections(bool queue_reset)
{
ldout(cct,1) << __func__ << " " << dendl;
lock.Lock();
for (set<AsyncConnectionRef>::iterator q = accepting_conns.begin();
q != accepting_conns.end(); ++q) {
AsyncConnectionRef p = *q;
ldout(cct, 5) << __func__ << " accepting_conn " << p.get() << dendl;
p->stop();
p->stop(queue_reset);
}
accepting_conns.clear();

Expand All @@ -827,7 +829,7 @@ void AsyncMessenger::mark_down_all()
ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
conns.erase(it);
p->get_perf_counter()->dec(l_msgr_active_connections);
p->stop();
p->stop(queue_reset);
}

{
Expand Down
8 changes: 6 additions & 2 deletions src/msg/async/AsyncMessenger.h
Expand Up @@ -176,8 +176,10 @@ class AsyncMessenger : public SimplePolicyMessenger {
ConnectionRef get_connection(const entity_inst_t& dest) override;
ConnectionRef get_loopback_connection() override;
int send_keepalive(Connection *con);
void mark_down(const entity_addr_t& addr) override;
void mark_down_all() override;
virtual void mark_down(const entity_addr_t& addr) override;
virtual void mark_down_all() override {
shutdown_connections(true);
}
/** @} // Connection Management */

/**
Expand Down Expand Up @@ -348,6 +350,8 @@ class AsyncMessenger : public SimplePolicyMessenger {
ms_deliver_handle_fast_connect(local_connection.get());
}

void shutdown_connections(bool queue_reset);

public:

/// con used for sending messages to ourselves
Expand Down
4 changes: 2 additions & 2 deletions src/msg/async/EventEpoll.cc
Expand Up @@ -123,8 +123,8 @@ int EpollDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval

if (e->events & EPOLLIN) mask |= EVENT_READABLE;
if (e->events & EPOLLOUT) mask |= EVENT_WRITABLE;
if (e->events & EPOLLERR) mask |= EVENT_WRITABLE;
if (e->events & EPOLLHUP) mask |= EVENT_WRITABLE;
if (e->events & EPOLLERR) mask |= EVENT_READABLE|EVENT_WRITABLE;
if (e->events & EPOLLHUP) mask |= EVENT_READABLE|EVENT_WRITABLE;
fired_events[j].fd = e->data.fd;
fired_events[j].mask = mask;
}
Expand Down