Skip to content

Commit

Permalink
msg: add ms_bind_before_connect to bind before connect
Browse files Browse the repository at this point in the history
Signed-off-by: Zengran Zhang <zhangzengran@h3c.com>
Signed-off-by: Haomai Wang <haomai@xsky.com>
  • Loading branch information
yuyuyu101 committed Jan 19, 2017
1 parent 08caf09 commit 6e4ed29
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/ceph_osd.cc
Expand Up @@ -573,7 +573,7 @@ int main(int argc, const char **argv)
r = ms_hb_front_server->bind(hb_front_addr);
if (r < 0)
exit(1);
r = ms_hb_front_client->client_bind(hb_back_addr);
r = ms_hb_front_client->client_bind(hb_front_addr);
if (r < 0)
exit(1);

Expand Down
1 change: 1 addition & 0 deletions src/common/config_opts.h
Expand Up @@ -198,6 +198,7 @@ OPTION(ms_bind_retry_delay, OPT_INT, 5) // Delay between attemps to bind
OPTION(ms_bind_retry_count, OPT_INT, 6) // If binding fails, how many times do we retry to bind
OPTION(ms_bind_retry_delay, OPT_INT, 6) // Delay between attemps to bind
#endif
OPTION(ms_bind_before_connect, OPT_BOOL, true)
OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10)
OPTION(ms_tcp_read_timeout, OPT_U64, 900)
OPTION(ms_pq_max_tokens_per_priority, OPT_U64, 16777216)
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/AsyncConnection.cc
Expand Up @@ -861,6 +861,7 @@ ssize_t AsyncConnection::_process_connection()

SocketOptions opts;
opts.priority = async_msgr->get_socket_priority();
opts.connect_bind_addr = msgr->get_myaddr();
r = worker->connect(get_peer_addr(), opts, &cs);
if (r < 0)
goto fail;
Expand Down
4 changes: 2 additions & 2 deletions src/msg/async/PosixStack.cc
Expand Up @@ -339,9 +339,9 @@ int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, C
int sd;

if (opts.nonblock) {
sd = net.nonblock_connect(addr);
sd = net.nonblock_connect(addr, opts.connect_bind_addr);
} else {
sd = net.connect(addr);
sd = net.connect(addr, opts.connect_bind_addr);
}

if (sd < 0) {
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/Stack.h
Expand Up @@ -42,6 +42,7 @@ struct SocketOptions {
bool nodelay = true;
int rcbuf_size = 0;
int priority = -1;
entity_addr_t connect_bind_addr;
};

/// \cond internal
Expand Down
22 changes: 17 additions & 5 deletions src/msg/async/net_handler.cc
Expand Up @@ -150,7 +150,7 @@ void NetHandler::set_priority(int sd, int prio)
}
}

int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock)
int NetHandler::generic_connect(const entity_addr_t& addr, const entity_addr_t &bind_addr, bool nonblock)
{
int ret;
int s = create_socket(addr.get_family());
Expand All @@ -167,6 +167,18 @@ int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock)

set_socket_options(s, cct->_conf->ms_tcp_nodelay, cct->_conf->ms_tcp_rcvbuf);

{
entity_addr_t addr = bind_addr;
if (cct->_conf->ms_bind_before_connect && (!addr.is_blank_ip())) {
addr.set_port(0);
ret = ::bind(s, addr.get_sockaddr(), addr.get_sockaddr_len());
if (ret < 0) {
ret = -errno;
ldout(cct, 2) << __func__ << " client bind error " << ", " << cpp_strerror(ret) << dendl;
return ret;
}
}
}

ret = ::connect(s, addr.get_sockaddr(), addr.get_sockaddr_len());
if (ret < 0) {
Expand Down Expand Up @@ -195,14 +207,14 @@ int NetHandler::reconnect(const entity_addr_t &addr, int sd)
return 0;
}

int NetHandler::connect(const entity_addr_t &addr)
int NetHandler::connect(const entity_addr_t &addr, const entity_addr_t& bind_addr)
{
return generic_connect(addr, false);
return generic_connect(addr, bind_addr, false);
}

int NetHandler::nonblock_connect(const entity_addr_t &addr)
int NetHandler::nonblock_connect(const entity_addr_t &addr, const entity_addr_t& bind_addr)
{
return generic_connect(addr, true);
return generic_connect(addr, bind_addr, true);
}


Expand Down
6 changes: 3 additions & 3 deletions src/msg/async/net_handler.h
Expand Up @@ -20,7 +20,7 @@

namespace ceph {
class NetHandler {
int generic_connect(const entity_addr_t& addr, bool nonblock);
int generic_connect(const entity_addr_t& addr, const entity_addr_t& bind_addr, bool nonblock);

CephContext *cct;
public:
Expand All @@ -29,7 +29,7 @@ namespace ceph {
int set_nonblock(int sd);
void set_close_on_exec(int sd);
int set_socket_options(int sd, bool nodelay, int size);
int connect(const entity_addr_t &addr);
int connect(const entity_addr_t &addr, const entity_addr_t& bind_addr);

/**
* Try to reconnect the socket.
Expand All @@ -39,7 +39,7 @@ namespace ceph {
* < 0 need to goto fail
*/
int reconnect(const entity_addr_t &addr, int sd);
int nonblock_connect(const entity_addr_t &addr);
int nonblock_connect(const entity_addr_t &addr, const entity_addr_t& bind_addr);
void set_priority(int sd, int priority);
};
}
Expand Down
12 changes: 12 additions & 0 deletions src/msg/simple/Pipe.cc
Expand Up @@ -989,6 +989,18 @@ int Pipe::connect()

set_socket_options();

{
entity_addr_t addr2bind = msgr->get_myaddr();
if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) {
addr2bind.set_port(0);
int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len());
if (r < 0) {
ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl;
goto fail;
}
}
}

// connect!
ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());
Expand Down

0 comments on commit 6e4ed29

Please sign in to comment.