Skip to content

Commit

Permalink
Merge pull request #12901 from yuyuyu101/wip-msgr-client-bind
Browse files Browse the repository at this point in the history
msg: client bind

Reviewed-by: Sage Weil <sage@redhat.com>
  • Loading branch information
yuriw committed Jan 23, 2017
2 parents f848235 + 6e4ed29 commit f9f9b63
Show file tree
Hide file tree
Showing 16 changed files with 134 additions and 32 deletions.
37 changes: 27 additions & 10 deletions src/ceph_osd.cc
Expand Up @@ -450,8 +450,11 @@ int main(int argc, const char **argv)
getpid(),
Messenger::HAS_HEAVY_TRAFFIC |
Messenger::HAS_MANY_CONNECTIONS);
Messenger *ms_hbclient = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hbclient",
Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_back_client",
getpid(), Messenger::HEARTBEAT);
Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_front_client",
getpid(), Messenger::HEARTBEAT);
Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_back_server",
Expand All @@ -462,10 +465,11 @@ int main(int argc, const char **argv)
Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "ms_objecter",
getpid(), 0);
if (!ms_public || !ms_cluster || !ms_hbclient || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter)
if (!ms_public || !ms_cluster || !ms_hb_front_client || !ms_hb_back_client || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter)
exit(1);
ms_cluster->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms_hbclient->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms_hb_front_client->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms_hb_back_client->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms_hb_back_server->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms_hb_front_server->set_cluster_protocol(CEPH_OSD_PROTOCOL);

Expand Down Expand Up @@ -523,7 +527,9 @@ int main(int argc, const char **argv)
ms_cluster->set_policy(entity_name_t::TYPE_CLIENT,
Messenger::Policy::stateless_server(0, 0));

ms_hbclient->set_policy(entity_name_t::TYPE_OSD,
ms_hb_front_client->set_policy(entity_name_t::TYPE_OSD,
Messenger::Policy::lossy_client(0, 0));
ms_hb_back_client->set_policy(entity_name_t::TYPE_OSD,
Messenger::Policy::lossy_client(0, 0));
ms_hb_back_server->set_policy(entity_name_t::TYPE_OSD,
Messenger::Policy::stateless_server(0, 0));
Expand All @@ -540,7 +546,8 @@ int main(int argc, const char **argv)
exit(1);

if (g_conf->osd_heartbeat_use_min_delay_socket) {
ms_hbclient->set_socket_priority(SOCKET_PRIORITY_MIN_DELAY);
ms_hb_front_client->set_socket_priority(SOCKET_PRIORITY_MIN_DELAY);
ms_hb_back_client->set_socket_priority(SOCKET_PRIORITY_MIN_DELAY);
ms_hb_back_server->set_socket_priority(SOCKET_PRIORITY_MIN_DELAY);
ms_hb_front_server->set_socket_priority(SOCKET_PRIORITY_MIN_DELAY);
}
Expand All @@ -553,6 +560,9 @@ int main(int argc, const char **argv)
hb_back_addr.set_port(0);
}
r = ms_hb_back_server->bind(hb_back_addr);
if (r < 0)
exit(1);
r = ms_hb_back_client->client_bind(hb_back_addr);
if (r < 0)
exit(1);

Expand All @@ -561,6 +571,9 @@ int main(int argc, const char **argv)
if (hb_front_addr.is_ip())
hb_front_addr.set_port(0);
r = ms_hb_front_server->bind(hb_front_addr);
if (r < 0)
exit(1);
r = ms_hb_front_client->client_bind(hb_front_addr);
if (r < 0)
exit(1);

Expand All @@ -586,7 +599,8 @@ int main(int argc, const char **argv)
whoami,
ms_cluster,
ms_public,
ms_hbclient,
ms_hb_front_client,
ms_hb_back_client,
ms_hb_front_server,
ms_hb_back_server,
ms_objecter,
Expand All @@ -602,7 +616,8 @@ int main(int argc, const char **argv)
}

ms_public->start();
ms_hbclient->start();
ms_hb_front_client->start();
ms_hb_back_client->start();
ms_hb_front_server->start();
ms_hb_back_server->start();
ms_cluster->start();
Expand Down Expand Up @@ -632,7 +647,8 @@ int main(int argc, const char **argv)
kill(getpid(), SIGTERM);

ms_public->wait();
ms_hbclient->wait();
ms_hb_front_client->wait();
ms_hb_back_client->wait();
ms_hb_front_server->wait();
ms_hb_back_server->wait();
ms_cluster->wait();
Expand All @@ -646,7 +662,8 @@ int main(int argc, const char **argv)
// done
delete osd;
delete ms_public;
delete ms_hbclient;
delete ms_hb_front_client;
delete ms_hb_back_client;
delete ms_hb_front_server;
delete ms_hb_back_server;
delete ms_cluster;
Expand Down
1 change: 1 addition & 0 deletions src/common/config_opts.h
Expand Up @@ -199,6 +199,7 @@ OPTION(ms_bind_retry_delay, OPT_INT, 5) // Delay between attemps to bind
OPTION(ms_bind_retry_count, OPT_INT, 6) // If binding fails, how many times do we retry to bind
OPTION(ms_bind_retry_delay, OPT_INT, 6) // Delay between attemps to bind
#endif
OPTION(ms_bind_before_connect, OPT_BOOL, true)
OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10)
OPTION(ms_tcp_read_timeout, OPT_U64, 900)
OPTION(ms_pq_max_tokens_per_priority, OPT_U64, 16777216)
Expand Down
8 changes: 8 additions & 0 deletions src/msg/Messenger.h
Expand Up @@ -408,6 +408,14 @@ class Messenger {
* @param avoid_ports Additional port to avoid binding to.
*/
virtual int rebind(const set<int>& avoid_ports) { return -EOPNOTSUPP; }
/**
* Bind the 'client' Messenger to a specific address.Messenger will bind
* the address before connect to others when option ms_bind_before_connect
* is true.
* @param bind_addr The address to bind to.
* @return 0 on success, or -1 on error, or -errno if
*/
virtual int client_bind(const entity_addr_t& bind_addr) = 0;
/**
* @} // Configuration
*/
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/AsyncConnection.cc
Expand Up @@ -883,6 +883,7 @@ ssize_t AsyncConnection::_process_connection()

SocketOptions opts;
opts.priority = async_msgr->get_socket_priority();
opts.connect_bind_addr = msgr->get_myaddr();
r = worker->connect(get_peer_addr(), opts, &cs);
if (r < 0)
goto fail;
Expand Down
19 changes: 19 additions & 0 deletions src/msg/async/AsyncMessenger.cc
Expand Up @@ -380,6 +380,25 @@ int AsyncMessenger::rebind(const set<int>& avoid_ports)
return 0;
}

int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
{
lock.Lock();
if (did_bind) {
assert(my_inst.addr == bind_addr);
return 0;
}
if (started) {
ldout(cct, 10) << __func__ << " already started" << dendl;
lock.Unlock();
return -1;
}
ldout(cct, 10) << __func__ << " " << bind_addr << dendl;
lock.Unlock();

set_myaddr(bind_addr);
return 0;
}

void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr,
const entity_addr_t& listen_addr)
{
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/AsyncMessenger.h
Expand Up @@ -116,6 +116,7 @@ class AsyncMessenger : public SimplePolicyMessenger {

int bind(const entity_addr_t& bind_addr);
int rebind(const set<int>& avoid_ports);
int client_bind(const entity_addr_t& bind_addr);

/** @} Configuration functions */

Expand Down
4 changes: 2 additions & 2 deletions src/msg/async/PosixStack.cc
Expand Up @@ -339,9 +339,9 @@ int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, C
int sd;

if (opts.nonblock) {
sd = net.nonblock_connect(addr);
sd = net.nonblock_connect(addr, opts.connect_bind_addr);
} else {
sd = net.connect(addr);
sd = net.connect(addr, opts.connect_bind_addr);
}

if (sd < 0) {
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/Stack.h
Expand Up @@ -42,6 +42,7 @@ struct SocketOptions {
bool nodelay = true;
int rcbuf_size = 0;
int priority = -1;
entity_addr_t connect_bind_addr;
};

/// \cond internal
Expand Down
22 changes: 17 additions & 5 deletions src/msg/async/net_handler.cc
Expand Up @@ -150,7 +150,7 @@ void NetHandler::set_priority(int sd, int prio)
}
}

int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock)
int NetHandler::generic_connect(const entity_addr_t& addr, const entity_addr_t &bind_addr, bool nonblock)
{
int ret;
int s = create_socket(addr.get_family());
Expand All @@ -167,6 +167,18 @@ int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock)

set_socket_options(s, cct->_conf->ms_tcp_nodelay, cct->_conf->ms_tcp_rcvbuf);

{
entity_addr_t addr = bind_addr;
if (cct->_conf->ms_bind_before_connect && (!addr.is_blank_ip())) {
addr.set_port(0);
ret = ::bind(s, addr.get_sockaddr(), addr.get_sockaddr_len());
if (ret < 0) {
ret = -errno;
ldout(cct, 2) << __func__ << " client bind error " << ", " << cpp_strerror(ret) << dendl;
return ret;
}
}
}

ret = ::connect(s, addr.get_sockaddr(), addr.get_sockaddr_len());
if (ret < 0) {
Expand Down Expand Up @@ -195,14 +207,14 @@ int NetHandler::reconnect(const entity_addr_t &addr, int sd)
return 0;
}

int NetHandler::connect(const entity_addr_t &addr)
int NetHandler::connect(const entity_addr_t &addr, const entity_addr_t& bind_addr)
{
return generic_connect(addr, false);
return generic_connect(addr, bind_addr, false);
}

int NetHandler::nonblock_connect(const entity_addr_t &addr)
int NetHandler::nonblock_connect(const entity_addr_t &addr, const entity_addr_t& bind_addr)
{
return generic_connect(addr, true);
return generic_connect(addr, bind_addr, true);
}


Expand Down
6 changes: 3 additions & 3 deletions src/msg/async/net_handler.h
Expand Up @@ -20,7 +20,7 @@

namespace ceph {
class NetHandler {
int generic_connect(const entity_addr_t& addr, bool nonblock);
int generic_connect(const entity_addr_t& addr, const entity_addr_t& bind_addr, bool nonblock);

CephContext *cct;
public:
Expand All @@ -29,7 +29,7 @@ namespace ceph {
int set_nonblock(int sd);
void set_close_on_exec(int sd);
int set_socket_options(int sd, bool nodelay, int size);
int connect(const entity_addr_t &addr);
int connect(const entity_addr_t &addr, const entity_addr_t& bind_addr);

/**
* Try to reconnect the socket.
Expand All @@ -39,7 +39,7 @@ namespace ceph {
* < 0 need to goto fail
*/
int reconnect(const entity_addr_t &addr, int sd);
int nonblock_connect(const entity_addr_t &addr);
int nonblock_connect(const entity_addr_t &addr, const entity_addr_t& bind_addr);
void set_priority(int sd, int priority);
};
}
Expand Down
12 changes: 12 additions & 0 deletions src/msg/simple/Pipe.cc
Expand Up @@ -989,6 +989,18 @@ int Pipe::connect()

set_socket_options();

{
entity_addr_t addr2bind = msgr->get_myaddr();
if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) {
addr2bind.set_port(0);
int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len());
if (r < 0) {
ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl;
goto fail;
}
}
}

// connect!
ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());
Expand Down
21 changes: 21 additions & 0 deletions src/msg/simple/SimpleMessenger.cc
Expand Up @@ -306,6 +306,27 @@ int SimpleMessenger::rebind(const set<int>& avoid_ports)
return accepter.rebind(avoid_ports);
}


int SimpleMessenger::client_bind(const entity_addr_t &bind_addr)
{
lock.Lock();
if (did_bind) {
assert(my_inst.addr == bind_addr);
return 0;
}
if (started) {
ldout(cct,10) << "rank.bind already started" << dendl;
lock.Unlock();
return -1;
}
ldout(cct,10) << "rank.bind " << bind_addr << dendl;
lock.Unlock();

set_myaddr(bind_addr);
return 0;
}


int SimpleMessenger::start()
{
lock.Lock();
Expand Down
1 change: 1 addition & 0 deletions src/msg/simple/SimpleMessenger.h
Expand Up @@ -115,6 +115,7 @@ class SimpleMessenger : public SimplePolicyMessenger {

int bind(const entity_addr_t& bind_addr);
int rebind(const set<int>& avoid_ports);
int client_bind(const entity_addr_t& bind_addr);

/** @} Configuration functions */

Expand Down
19 changes: 12 additions & 7 deletions src/osd/OSD.cc
Expand Up @@ -837,9 +837,9 @@ pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t f
release_map(next_map);
return ret;
}
ret.first = osd->hbclient_messenger->get_connection(next_map->get_hb_back_inst(peer));
ret.first = osd->hb_back_client_messenger->get_connection(next_map->get_hb_back_inst(peer));
if (next_map->get_hb_front_addr(peer) != entity_addr_t())
ret.second = osd->hbclient_messenger->get_connection(next_map->get_hb_front_inst(peer));
ret.second = osd->hb_front_client_messenger->get_connection(next_map->get_hb_front_inst(peer));
release_map(next_map);
return ret;
}
Expand Down Expand Up @@ -1638,7 +1638,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
int id,
Messenger *internal_messenger,
Messenger *external_messenger,
Messenger *hb_clientm,
Messenger *hb_client_front,
Messenger *hb_client_back,
Messenger *hb_front_serverm,
Messenger *hb_back_serverm,
Messenger *osdc_messenger,
Expand Down Expand Up @@ -1680,7 +1681,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false),
heartbeat_need_update(true),
hbclient_messenger(hb_clientm),
hb_front_client_messenger(hb_client_front),
hb_back_client_messenger(hb_client_back),
hb_front_server_messenger(hb_front_serverm),
hb_back_server_messenger(hb_back_serverm),
daily_loadavg(0.0),
Expand Down Expand Up @@ -2189,7 +2191,8 @@ int OSD::init()
client_messenger->add_dispatcher_head(this);
cluster_messenger->add_dispatcher_head(this);

hbclient_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_front_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);

Expand Down Expand Up @@ -2860,7 +2863,8 @@ int OSD::shutdown()
class_handler->shutdown();
client_messenger->shutdown();
cluster_messenger->shutdown();
hbclient_messenger->shutdown();
hb_front_client_messenger->shutdown();
hb_back_client_messenger->shutdown();
objecter_messenger->shutdown();
hb_front_server_messenger->shutdown();
hb_back_server_messenger->shutdown();
Expand Down Expand Up @@ -7219,7 +7223,8 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
<< " rebind hb_front_server_messenger failed" << dendl;
}

hbclient_messenger->mark_down_all();
hb_front_client_messenger->mark_down_all();
hb_back_client_messenger->mark_down_all();

reset_heartbeat_peers();
}
Expand Down

0 comments on commit f9f9b63

Please sign in to comment.