Skip to content

Commit

Permalink
Merge pull request #7831 from yuyuyu101/wip-14912
Browse files Browse the repository at this point in the history
AsyncMessenger: fix several bugs
  • Loading branch information
yuyuyu101 committed Mar 1, 2016
2 parents 48a5f2d + 0c0e8a0 commit 4ea3d68
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 25 deletions.
39 changes: 15 additions & 24 deletions src/msg/async/AsyncConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p)
r = read_bulk(sd, p+state_offset, left);
ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl;
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " read failed, state is " << get_state_name(state) << dendl;
ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
return -1;
} else if (r == static_cast<int>(left)) {
state_offset = 0;
Expand All @@ -479,7 +479,7 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p)
ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end
<< " left is " << left << " got " << r << dendl;
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " read failed, state is " << get_state_name(state) << dendl;
ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
return -1;
}
recv_end += r;
Expand All @@ -496,8 +496,7 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p)
recv_end = recv_start = 0;
}
ldout(async_msgr->cct, 25) << __func__ << " need len " << len << " remaining "
<< len - state_offset << " bytes, state is "
<< get_state_name(state) << dendl;
<< len - state_offset << " bytes" << dendl;
return len - state_offset;
}

Expand All @@ -508,17 +507,15 @@ void AsyncConnection::process()
bool already_dispatch_writer = false;
Mutex::Locker l(lock);
do {
ldout(async_msgr->cct, 20) << __func__ << " state is " << get_state_name(state)
<< ", prev state is " << get_state_name(prev_state) << dendl;
ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl;
prev_state = state;
switch (state) {
case STATE_OPEN:
{
char tag = -1;
r = read_until(sizeof(tag), &tag);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " read tag failed, state is "
<< get_state_name(state) << dendl;
ldout(async_msgr->cct, 1) << __func__ << " read tag failed" << dendl;
goto fail;
} else if (r > 0) {
break;
Expand Down Expand Up @@ -1517,13 +1514,15 @@ ssize_t AsyncConnection::_process_connection()
memset(&connect_msg, 0, sizeof(connect_msg));
write_lock.Lock();
can_write = CANWRITE;
if (is_queued())
center->dispatch_event_external(write_handler);
write_lock.Unlock();
break;
}

default:
{
lderr(async_msgr->cct) << __func__ << " bad state: " << get_state_name(state) << dendl;
lderr(async_msgr->cct) << __func__ << " bad state: " << state << dendl;
assert(0);
}
}
Expand Down Expand Up @@ -1584,10 +1583,6 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co
}
if (reply.tag == CEPH_MSGR_TAG_WAIT) {
ldout(async_msgr->cct, 3) << __func__ << " connect got WAIT (connection race)" << dendl;
if (!once_ready) {
ldout(async_msgr->cct, 1) << __func__ << " got WAIT while connection isn't registered, just closed." << dendl;
goto fail;
}
state = STATE_WAIT;
}

Expand Down Expand Up @@ -1678,8 +1673,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis

lock.Lock();
if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down, state="
<< get_state_name(state) << dendl;
ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl;
assert(state == STATE_CLOSED);
goto fail;
}
Expand All @@ -1693,7 +1687,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis

if (existing->replacing || existing->state == STATE_CLOSED) {
ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
<< " state=" << get_state_name(existing->state) << dendl;
<< " existing_state=" << get_state_name(existing->state) << dendl;
reply.global_seq = existing->peer_global_seq;
r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
existing->lock.Unlock();
Expand Down Expand Up @@ -1723,7 +1717,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
}

ldout(async_msgr->cct, 0) << __func__ << " accept connect_seq " << connect.connect_seq
<< " vs existing csq=" << existing->connect_seq << " state="
<< " vs existing csq=" << existing->connect_seq << " existing_state="
<< get_state_name(existing->state) << dendl;

if (connect.connect_seq == 0 && existing->connect_seq > 0) {
Expand Down Expand Up @@ -1941,8 +1935,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
goto fail_registered;
}
if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down, state="
<< get_state_name(state) << dendl;
ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl;
assert(state == STATE_CLOSED);
goto fail_registered;
}
Expand Down Expand Up @@ -2154,7 +2147,7 @@ int AsyncConnection::randomize_out_seq()
void AsyncConnection::fault()
{
if (state == STATE_CLOSED) {
ldout(async_msgr->cct, 10) << __func__ << " state is already " << get_state_name(state) << dendl;
ldout(async_msgr->cct, 10) << __func__ << " connection is already closed" << dendl;
return ;
}

Expand Down Expand Up @@ -2185,8 +2178,7 @@ void AsyncConnection::fault()
if (!once_ready && !is_queued() &&
state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half "
<< " accept state just closed, state="
<< get_state_name(state) << dendl;
<< " accept state just closed" << dendl;
center->dispatch_event_external(reset_handler);

write_lock.Unlock();
Expand Down Expand Up @@ -2515,8 +2507,7 @@ void AsyncConnection::handle_write()
lock.Lock();
write_lock.Lock();
if (state == STATE_STANDBY && !policy.server && is_queued()) {
ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
<< " policy.server is false" << dendl;
ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl;
_connect();
} else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
r = _try_send();
Expand Down
12 changes: 11 additions & 1 deletion src/msg/async/Event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,17 @@ int EventCenter::init(int n)

EventCenter::~EventCenter()
{
assert(external_events.empty() && time_events.empty());
{
Mutex::Locker l(external_lock);
while (!external_events.empty()) {
EventCallbackRef e = external_events.front();
if (e)
e->do_request(0);
external_events.pop_front();
}
}
assert(time_events.empty());

if (notify_receive_fd >= 0) {
delete_file_event(notify_receive_fd, EVENT_READABLE);
::close(notify_receive_fd);
Expand Down

0 comments on commit 4ea3d68

Please sign in to comment.