Skip to content

Commit

Permalink
Merge pull request #5848 from storage-zuiwanyuan/wip-nonblock-connect
Browse files Browse the repository at this point in the history
msg/async: support of non-block connect in async messenger

Reviewed-by: Haomai Wang <haomai@xsky.com>
  • Loading branch information
liewegas committed Nov 13, 2015
2 parents 0fa88ea + 0fd8de3 commit f622301
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 4 deletions.
21 changes: 17 additions & 4 deletions src/msg/async/AsyncConnection.cc
Expand Up @@ -972,16 +972,28 @@ int AsyncConnection::_process_connection()
::close(sd);
}

sd = net.connect(get_peer_addr());
sd = net.nonblock_connect(get_peer_addr());
if (sd < 0) {
goto fail;
}
r = net.set_nonblock(sd);

center->create_file_event(sd, EVENT_READABLE, read_handler);
state = STATE_CONNECTING_RE;
break;
}

case STATE_CONNECTING_RE:
{
r = net.reconnect(get_peer_addr(), sd);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl;
goto fail;
} else if (r > 0) {
break;
}

center->create_file_event(sd, EVENT_READABLE, read_handler);
net.set_socket_options(sd);

state = STATE_CONNECTING_WAIT_BANNER;
break;
}
Expand Down Expand Up @@ -2134,6 +2146,7 @@ void AsyncConnection::fault()
if (backoff > async_msgr->cct->_conf->ms_max_backoff)
backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff);
}

state = STATE_CONNECTING;
ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl;
}
Expand Down Expand Up @@ -2426,7 +2439,7 @@ void AsyncConnection::handle_write()
ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
<< " policy.server is false" << dendl;
_connect();
} else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) {
} else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
r = _try_send(bl);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
Expand Down
2 changes: 2 additions & 0 deletions src/msg/async/AsyncConnection.h
Expand Up @@ -160,6 +160,7 @@ class AsyncConnection : public Connection {
STATE_OPEN_TAG_CLOSE,
STATE_WAIT_SEND,
STATE_CONNECTING,
STATE_CONNECTING_RE,
STATE_CONNECTING_WAIT_BANNER,
STATE_CONNECTING_WAIT_IDENTIFY_PEER,
STATE_CONNECTING_SEND_CONNECT_MSG,
Expand Down Expand Up @@ -196,6 +197,7 @@ class AsyncConnection : public Connection {
"STATE_OPEN_TAG_CLOSE",
"STATE_WAIT_SEND",
"STATE_CONNECTING",
"STATE_CONNECTING_RE",
"STATE_CONNECTING_WAIT_BANNER",
"STATE_CONNECTING_WAIT_IDENTIFY_PEER",
"STATE_CONNECTING_SEND_CONNECT_MSG",
Expand Down
14 changes: 14 additions & 0 deletions src/msg/async/net_handler.cc
Expand Up @@ -132,6 +132,20 @@ int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock)
return s;
}

int NetHandler::reconnect(const entity_addr_t &addr, int sd)
{
int ret = ::connect(sd, (sockaddr*)&addr.addr, addr.addr_size());

if (ret < 0 && errno != EISCONN) {
ldout(cct, 10) << __func__ << " reconnect: " << strerror(errno) << dendl;
if (errno == EINPROGRESS || errno == EALREADY)
return 1;
return -errno;
}

return 0;
}

int NetHandler::connect(const entity_addr_t &addr)
{
return generic_connect(addr, false);
Expand Down
9 changes: 9 additions & 0 deletions src/msg/async/net_handler.h
Expand Up @@ -30,6 +30,15 @@ namespace ceph {
int set_nonblock(int sd);
void set_socket_options(int sd);
int connect(const entity_addr_t &addr);

/**
* Try to reconnect the socket.
*
* @return 0 success
* > 0 just break, and wait for event
* < 0 need to goto fail
*/
int reconnect(const entity_addr_t &addr, int sd);
int nonblock_connect(const entity_addr_t &addr);
};
}
Expand Down

0 comments on commit f622301

Please sign in to comment.