Skip to content

Commit

Permalink
Merge pull request #9781 from yuyuyu101/wip-epoll
Browse files Browse the repository at this point in the history
msg/async: harden error logic handle
  • Loading branch information
yuyuyu101 committed Jun 29, 2016
2 parents aee60ff + f6c73e9 commit 4c26fae
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 31 deletions.
9 changes: 7 additions & 2 deletions src/msg/async/AsyncConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
EventCenter *c, PerfCounters *p)
: Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
logger(p), global_seq(0), connect_seq(0), peer_global_seq(0),
out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1), port(-1),
out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1),
dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
Expand Down Expand Up @@ -354,6 +354,9 @@ ssize_t AsyncConnection::_try_send(bool more)
if (open_write && !is_queued()) {
center->delete_file_event(sd, EVENT_WRITABLE);
open_write = false;

if (state_after_send != STATE_NONE)
center->dispatch_event_external(read_handler);
}

return outcoming_bl.length();
Expand Down Expand Up @@ -962,7 +965,7 @@ ssize_t AsyncConnection::_process_connection()
if (!outcoming_bl.length()) {
assert(state_after_send);
state = state_after_send;
state_after_send = 0;
state_after_send = STATE_NONE;
}
break;
}
Expand Down Expand Up @@ -2007,6 +2010,7 @@ void AsyncConnection::requeue_sent()
ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend "
<< " (" << m->get_seq() << ")" << dendl;
rq.push_front(make_pair(bufferlist(), m));
out_seq.dec();
}
}

Expand All @@ -2025,6 +2029,7 @@ void AsyncConnection::discard_requeued_up_to(uint64_t seq)
<< " <= " << seq << ", discarding" << dendl;
p.second->put();
rq.pop_front();
out_seq.inc();
}
if (rq.empty())
out_q.erase(CEPH_MSG_PRIO_HIGHEST);
Expand Down
4 changes: 2 additions & 2 deletions src/msg/async/AsyncConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,9 @@ class AsyncConnection : public Connection {
void process();
void wakeup_from(uint64_t id);
void local_deliver();
void stop() {
void stop(bool queue_reset) {
lock.Lock();
bool need_queue_reset = (state != STATE_CLOSED);
bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
lock.Unlock();
mark_down();
if (need_queue_reset)
Expand Down
48 changes: 25 additions & 23 deletions src/msg/async/AsyncMessenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,11 @@ int Processor::start(Worker *w)
void Processor::accept()
{
ldout(msgr->cct, 10) << __func__ << " listen_sd=" << listen_sd << dendl;
int errors = 0;
while (errors < 4) {
while (true) {
sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen);
if (sd >= 0) {
errors = 0;
ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << sd << dendl;

msgr->add_accept(sd);
Expand All @@ -293,10 +291,18 @@ void Processor::accept()
continue;
} else if (errno == EAGAIN) {
break;
} else if (errno == EMFILE || errno == ENFILE) {
lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << sd
<< " errno " << errno << " " << cpp_strerror(errno) << dendl;
break;
} else if (errno == ECONNABORTED) {
ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << sd
<< " errno " << errno << " " << cpp_strerror(errno) << dendl;
continue;
} else {
errors++;
ldout(msgr->cct, 20) << __func__ << " no incoming connection? sd = " << sd
<< " errno " << errno << " " << cpp_strerror(errno) << dendl;
lderr(msgr->cct) << __func__ << " no incoming connection? sd = " << sd
<< " errno " << errno << " " << cpp_strerror(errno) << dendl;
break;
}
}
}
Expand Down Expand Up @@ -571,16 +577,17 @@ int AsyncMessenger::shutdown()
{
ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;

// break ref cycles on the loopback connection
processor.stop();
mark_down_all();
dispatch_queue.shutdown();
// break ref cycles on the loopback connection
local_connection->set_priv(NULL);
pool->barrier();
// done! clean up.
processor.stop();
did_bind = false;
lock.Lock();
stop_cond.Signal();
lock.Unlock();
stopped = true;
lock.Unlock();
pool->barrier();
return 0;
}

Expand Down Expand Up @@ -653,22 +660,17 @@ void AsyncMessenger::wait()

lock.Unlock();

// done! clean up.
ldout(cct,20) << __func__ << ": stopping processor thread" << dendl;
processor.stop();
did_bind = false;
ldout(cct,20) << __func__ << ": stopped processor thread" << dendl;

// close all connections
mark_down_all();

dispatch_queue.shutdown();
if (dispatch_queue.is_started()) {
ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl;
dispatch_queue.wait();
dispatch_queue.discard_local();
ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl;
}

// close all connections
shutdown_connections(false);

ldout(cct, 10) << __func__ << ": done." << dendl;
ldout(cct, 1) << __func__ << " complete." << dendl;
started = false;
Expand Down Expand Up @@ -809,15 +811,15 @@ int AsyncMessenger::send_keepalive(Connection *con)
return 0;
}

void AsyncMessenger::mark_down_all()
void AsyncMessenger::shutdown_connections(bool queue_reset)
{
ldout(cct,1) << __func__ << " " << dendl;
lock.Lock();
for (set<AsyncConnectionRef>::iterator q = accepting_conns.begin();
q != accepting_conns.end(); ++q) {
AsyncConnectionRef p = *q;
ldout(cct, 5) << __func__ << " accepting_conn " << p.get() << dendl;
p->stop();
p->stop(queue_reset);
}
accepting_conns.clear();

Expand All @@ -827,7 +829,7 @@ void AsyncMessenger::mark_down_all()
ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
conns.erase(it);
p->get_perf_counter()->dec(l_msgr_active_connections);
p->stop();
p->stop(queue_reset);
}

{
Expand Down
8 changes: 6 additions & 2 deletions src/msg/async/AsyncMessenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,10 @@ class AsyncMessenger : public SimplePolicyMessenger {
ConnectionRef get_connection(const entity_inst_t& dest) override;
ConnectionRef get_loopback_connection() override;
int send_keepalive(Connection *con);
void mark_down(const entity_addr_t& addr) override;
void mark_down_all() override;
virtual void mark_down(const entity_addr_t& addr) override;
virtual void mark_down_all() override {
shutdown_connections(true);
}
/** @} // Connection Management */

/**
Expand Down Expand Up @@ -348,6 +350,8 @@ class AsyncMessenger : public SimplePolicyMessenger {
ms_deliver_handle_fast_connect(local_connection.get());
}

void shutdown_connections(bool queue_reset);

public:

/// con used for sending messages to ourselves
Expand Down
4 changes: 2 additions & 2 deletions src/msg/async/EventEpoll.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ int EpollDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval

if (e->events & EPOLLIN) mask |= EVENT_READABLE;
if (e->events & EPOLLOUT) mask |= EVENT_WRITABLE;
if (e->events & EPOLLERR) mask |= EVENT_WRITABLE;
if (e->events & EPOLLHUP) mask |= EVENT_WRITABLE;
if (e->events & EPOLLERR) mask |= EVENT_READABLE|EVENT_WRITABLE;
if (e->events & EPOLLHUP) mask |= EVENT_READABLE|EVENT_WRITABLE;
fired_events[j].fd = e->data.fd;
fired_events[j].mask = mask;
}
Expand Down

0 comments on commit 4c26fae

Please sign in to comment.