Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

msg/async: support of non-block connect in async messenger #5681

Closed
Closed
24 changes: 18 additions & 6 deletions src/msg/async/AsyncConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -967,17 +967,27 @@ int AsyncConnection::_process_connection()
::close(sd);
}

sd = net.connect(get_peer_addr());
sd = net.nonblock_connect(get_peer_addr());
if (sd < 0) {
goto fail;
}
r = net.set_nonblock(sd);

center->create_file_event(sd, EVENT_READABLE, read_handler);
state = STATE_CONNECTING_RE;
break;
}

case STATE_CONNECTING_RE:
{
r = net.reconnect(get_peer_addr(), sd);
if (r < 0) {
if (r == -EINPROGRESS || r == -EALREADY)
break;
goto fail;
}

net.set_socket_options(sd);

center->create_file_event(sd, EVENT_READABLE, read_handler);
state = STATE_CONNECTING_WAIT_BANNER;
break;
}
Expand Down Expand Up @@ -2069,7 +2079,7 @@ void AsyncConnection::fault()
}

write_lock.Lock();
if (sd >= 0) {
if (sd >= 0 && state != STATE_CONNECTING_RE) {
shutdown_socket();
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
::close(sd);
Expand Down Expand Up @@ -2123,7 +2133,9 @@ void AsyncConnection::fault()
if (backoff > async_msgr->cct->_conf->ms_max_backoff)
backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff);
}
state = STATE_CONNECTING;

if (state != STATE_CONNECTING_RE)
state = STATE_CONNECTING;
ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl;
}

Expand Down Expand Up @@ -2415,7 +2427,7 @@ void AsyncConnection::handle_write()
ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
<< " policy.server is false" << dendl;
_connect();
} else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) {
} else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
r = _try_send(bl);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
Expand Down
2 changes: 2 additions & 0 deletions src/msg/async/AsyncConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ class AsyncConnection : public Connection {
STATE_OPEN_TAG_CLOSE,
STATE_WAIT_SEND,
STATE_CONNECTING,
STATE_CONNECTING_RE,
STATE_CONNECTING_WAIT_BANNER,
STATE_CONNECTING_WAIT_IDENTIFY_PEER,
STATE_CONNECTING_SEND_CONNECT_MSG,
Expand Down Expand Up @@ -196,6 +197,7 @@ class AsyncConnection : public Connection {
"STATE_OPEN_TAG_CLOSE",
"STATE_WAIT_SEND",
"STATE_CONNECTING",
"STATE_CONNECTING_RE",
"STATE_CONNECTING_WAIT_BANNER",
"STATE_CONNECTING_WAIT_IDENTIFY_PEER",
"STATE_CONNECTING_SEND_CONNECT_MSG",
Expand Down
12 changes: 12 additions & 0 deletions src/msg/async/net_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,18 @@ int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock)
return s;
}

int NetHandler::reconnect(const entity_addr_t &addr, int sd)
{
int ret = ::connect(sd, (sockaddr*)&addr.addr, addr.addr_size());

if (ret < 0 && errno != EISCONN) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it maybe still in connecting(EINPROGRESS)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In reconnect , it must be wakened by poll(epoll) with an read event. That means it being connected or peer have sent some data to me. We can use reconnect to test whether it is connected or not. If it is connected, then errno will be EISCONN. If it is not connected, we will go to fail, and go to connecting again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it may not only wake up from epoll. AsyncConnection has time event which also can wakeup and it maybe legacy so that wakeup connection when connecting

ldout(cct, 10) << __func__ << " reconnect: " << strerror(errno) << dendl;
return -errno;
}

return sd;
}

int NetHandler::connect(const entity_addr_t &addr)
{
return generic_connect(addr, false);
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/net_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace ceph {
int set_nonblock(int sd);
void set_socket_options(int sd);
int connect(const entity_addr_t &addr);
int reconnect(const entity_addr_t &addr, int sd);
int nonblock_connect(const entity_addr_t &addr);
};
}
Expand Down