Skip to content

Commit

Permalink
msg/async: support of non-block connect in async messenger
Browse files Browse the repository at this point in the history
Fixes: #12802

Signed-off-by: Jianhui Yuan <zuiwanyuan@gmail.com>
  • Loading branch information
zuiwanyuan committed Sep 9, 2015
1 parent 3868939 commit a913b67
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 6 deletions.
25 changes: 19 additions & 6 deletions src/msg/async/AsyncConnection.cc
Expand Up @@ -971,16 +971,27 @@ int AsyncConnection::_process_connection()
::close(sd);
}

sd = net.connect(get_peer_addr());
sd = net.nonblock_connect(get_peer_addr());
if (sd < 0) {
goto fail;
}
r = net.set_nonblock(sd);

center->create_file_event(sd, EVENT_READABLE, read_handler);
state = STATE_CONNECTING_RE;
break;
}

case STATE_CONNECTING_RE:
{
r = net.reconnect(get_peer_addr(), sd);
if (r < 0) {
if (r == -EINPROGRESS || r == -EALREADY)
break;
goto fail;
}

center->create_file_event(sd, EVENT_READABLE, read_handler);
net.set_socket_options(sd);

state = STATE_CONNECTING_WAIT_BANNER;
break;
}
Expand Down Expand Up @@ -2078,7 +2089,7 @@ void AsyncConnection::fault()
}

write_lock.Lock();
if (sd >= 0) {
if (sd >= 0 && state != STATE_CONNECTING_RE) {
shutdown_socket();
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
::close(sd);
Expand Down Expand Up @@ -2132,7 +2143,9 @@ void AsyncConnection::fault()
if (backoff > async_msgr->cct->_conf->ms_max_backoff)
backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff);
}
state = STATE_CONNECTING;

if (state != STATE_CONNECTING_RE)
state = STATE_CONNECTING;
ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl;
}

Expand Down Expand Up @@ -2424,7 +2437,7 @@ void AsyncConnection::handle_write()
ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
<< " policy.server is false" << dendl;
_connect();
} else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) {
} else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
r = _try_send(bl);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
Expand Down
2 changes: 2 additions & 0 deletions src/msg/async/AsyncConnection.h
Expand Up @@ -160,6 +160,7 @@ class AsyncConnection : public Connection {
STATE_OPEN_TAG_CLOSE,
STATE_WAIT_SEND,
STATE_CONNECTING,
STATE_CONNECTING_RE,
STATE_CONNECTING_WAIT_BANNER,
STATE_CONNECTING_WAIT_IDENTIFY_PEER,
STATE_CONNECTING_SEND_CONNECT_MSG,
Expand Down Expand Up @@ -196,6 +197,7 @@ class AsyncConnection : public Connection {
"STATE_OPEN_TAG_CLOSE",
"STATE_WAIT_SEND",
"STATE_CONNECTING",
"STATE_CONNECTING_RE",
"STATE_CONNECTING_WAIT_BANNER",
"STATE_CONNECTING_WAIT_IDENTIFY_PEER",
"STATE_CONNECTING_SEND_CONNECT_MSG",
Expand Down
12 changes: 12 additions & 0 deletions src/msg/async/net_handler.cc
Expand Up @@ -132,6 +132,18 @@ int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock)
return s;
}

int NetHandler::reconnect(const entity_addr_t &addr, int sd)
{
int ret = ::connect(sd, (sockaddr*)&addr.addr, addr.addr_size());

if (ret < 0 && errno != EISCONN) {
ldout(cct, 10) << __func__ << " reconnect: " << strerror(errno) << dendl;
return -errno;
}

return sd;
}

int NetHandler::connect(const entity_addr_t &addr)
{
return generic_connect(addr, false);
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/net_handler.h
Expand Up @@ -30,6 +30,7 @@ namespace ceph {
int set_nonblock(int sd);
void set_socket_options(int sd);
int connect(const entity_addr_t &addr);
int reconnect(const entity_addr_t &addr, int sd);
int nonblock_connect(const entity_addr_t &addr);
};
}
Expand Down

0 comments on commit a913b67

Please sign in to comment.