Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

msg: client bind #12901

Merged
merged 4 commits into from Jan 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 27 additions & 10 deletions src/ceph_osd.cc
Expand Up @@ -450,8 +450,11 @@ int main(int argc, const char **argv)
getpid(),
Messenger::HAS_HEAVY_TRAFFIC |
Messenger::HAS_MANY_CONNECTIONS);
Messenger *ms_hbclient = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hbclient",
Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_back_client",
getpid(), Messenger::HEARTBEAT);
Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_front_client",
getpid(), Messenger::HEARTBEAT);
Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_back_server",
Expand All @@ -462,10 +465,11 @@ int main(int argc, const char **argv)
Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "ms_objecter",
getpid(), 0);
if (!ms_public || !ms_cluster || !ms_hbclient || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter)
if (!ms_public || !ms_cluster || !ms_hb_front_client || !ms_hb_back_client || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter)
exit(1);
ms_cluster->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms_hbclient->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms_hb_front_client->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms_hb_back_client->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms_hb_back_server->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms_hb_front_server->set_cluster_protocol(CEPH_OSD_PROTOCOL);

Expand Down Expand Up @@ -523,7 +527,9 @@ int main(int argc, const char **argv)
ms_cluster->set_policy(entity_name_t::TYPE_CLIENT,
Messenger::Policy::stateless_server(0, 0));

ms_hbclient->set_policy(entity_name_t::TYPE_OSD,
ms_hb_front_client->set_policy(entity_name_t::TYPE_OSD,
Messenger::Policy::lossy_client(0, 0));
ms_hb_back_client->set_policy(entity_name_t::TYPE_OSD,
Messenger::Policy::lossy_client(0, 0));
ms_hb_back_server->set_policy(entity_name_t::TYPE_OSD,
Messenger::Policy::stateless_server(0, 0));
Expand All @@ -540,7 +546,8 @@ int main(int argc, const char **argv)
exit(1);

if (g_conf->osd_heartbeat_use_min_delay_socket) {
ms_hbclient->set_socket_priority(SOCKET_PRIORITY_MIN_DELAY);
ms_hb_front_client->set_socket_priority(SOCKET_PRIORITY_MIN_DELAY);
ms_hb_back_client->set_socket_priority(SOCKET_PRIORITY_MIN_DELAY);
ms_hb_back_server->set_socket_priority(SOCKET_PRIORITY_MIN_DELAY);
ms_hb_front_server->set_socket_priority(SOCKET_PRIORITY_MIN_DELAY);
}
Expand All @@ -553,6 +560,9 @@ int main(int argc, const char **argv)
hb_back_addr.set_port(0);
}
r = ms_hb_back_server->bind(hb_back_addr);
if (r < 0)
exit(1);
r = ms_hb_back_client->client_bind(hb_back_addr);
if (r < 0)
exit(1);

Expand All @@ -561,6 +571,9 @@ int main(int argc, const char **argv)
if (hb_front_addr.is_ip())
hb_front_addr.set_port(0);
r = ms_hb_front_server->bind(hb_front_addr);
if (r < 0)
exit(1);
r = ms_hb_front_client->client_bind(hb_front_addr);
if (r < 0)
exit(1);

Expand All @@ -586,7 +599,8 @@ int main(int argc, const char **argv)
whoami,
ms_cluster,
ms_public,
ms_hbclient,
ms_hb_front_client,
ms_hb_back_client,
ms_hb_front_server,
ms_hb_back_server,
ms_objecter,
Expand All @@ -602,7 +616,8 @@ int main(int argc, const char **argv)
}

ms_public->start();
ms_hbclient->start();
ms_hb_front_client->start();
ms_hb_back_client->start();
ms_hb_front_server->start();
ms_hb_back_server->start();
ms_cluster->start();
Expand Down Expand Up @@ -632,7 +647,8 @@ int main(int argc, const char **argv)
kill(getpid(), SIGTERM);

ms_public->wait();
ms_hbclient->wait();
ms_hb_front_client->wait();
ms_hb_back_client->wait();
ms_hb_front_server->wait();
ms_hb_back_server->wait();
ms_cluster->wait();
Expand All @@ -646,7 +662,8 @@ int main(int argc, const char **argv)
// done
delete osd;
delete ms_public;
delete ms_hbclient;
delete ms_hb_front_client;
delete ms_hb_back_client;
delete ms_hb_front_server;
delete ms_hb_back_server;
delete ms_cluster;
Expand Down
1 change: 1 addition & 0 deletions src/common/config_opts.h
Expand Up @@ -198,6 +198,7 @@ OPTION(ms_bind_retry_delay, OPT_INT, 5) // Delay between attemps to bind
OPTION(ms_bind_retry_count, OPT_INT, 6) // If binding fails, how many times do we retry to bind
OPTION(ms_bind_retry_delay, OPT_INT, 6) // Delay between attemps to bind
#endif
OPTION(ms_bind_before_connect, OPT_BOOL, true)
OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10)
OPTION(ms_tcp_read_timeout, OPT_U64, 900)
OPTION(ms_pq_max_tokens_per_priority, OPT_U64, 16777216)
Expand Down
8 changes: 8 additions & 0 deletions src/msg/Messenger.h
Expand Up @@ -408,6 +408,14 @@ class Messenger {
* @param avoid_ports Additional port to avoid binding to.
*/
virtual int rebind(const set<int>& avoid_ports) { return -EOPNOTSUPP; }
/**
* Bind the 'client' Messenger to a specific address.Messenger will bind
* the address before connect to others when option ms_bind_before_connect
* is true.
* @param bind_addr The address to bind to.
* @return 0 on success, or -1 on error, or -errno if
*/
virtual int client_bind(const entity_addr_t& bind_addr) = 0;
/**
* @} // Configuration
*/
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/AsyncConnection.cc
Expand Up @@ -861,6 +861,7 @@ ssize_t AsyncConnection::_process_connection()

SocketOptions opts;
opts.priority = async_msgr->get_socket_priority();
opts.connect_bind_addr = msgr->get_myaddr();
r = worker->connect(get_peer_addr(), opts, &cs);
if (r < 0)
goto fail;
Expand Down
19 changes: 19 additions & 0 deletions src/msg/async/AsyncMessenger.cc
Expand Up @@ -376,6 +376,25 @@ int AsyncMessenger::rebind(const set<int>& avoid_ports)
return 0;
}

int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
{
lock.Lock();
if (did_bind) {
assert(my_inst.addr == bind_addr);
return 0;
}
if (started) {
ldout(cct, 10) << __func__ << " already started" << dendl;
lock.Unlock();
return -1;
}
ldout(cct, 10) << __func__ << " " << bind_addr << dendl;
lock.Unlock();

set_myaddr(bind_addr);
return 0;
}

void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr,
const entity_addr_t& listen_addr)
{
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/AsyncMessenger.h
Expand Up @@ -116,6 +116,7 @@ class AsyncMessenger : public SimplePolicyMessenger {

int bind(const entity_addr_t& bind_addr);
int rebind(const set<int>& avoid_ports);
int client_bind(const entity_addr_t& bind_addr);

/** @} Configuration functions */

Expand Down
4 changes: 2 additions & 2 deletions src/msg/async/PosixStack.cc
Expand Up @@ -339,9 +339,9 @@ int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, C
int sd;

if (opts.nonblock) {
sd = net.nonblock_connect(addr);
sd = net.nonblock_connect(addr, opts.connect_bind_addr);
} else {
sd = net.connect(addr);
sd = net.connect(addr, opts.connect_bind_addr);
}

if (sd < 0) {
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/Stack.h
Expand Up @@ -42,6 +42,7 @@ struct SocketOptions {
bool nodelay = true;
int rcbuf_size = 0;
int priority = -1;
entity_addr_t connect_bind_addr;
};

/// \cond internal
Expand Down
22 changes: 17 additions & 5 deletions src/msg/async/net_handler.cc
Expand Up @@ -150,7 +150,7 @@ void NetHandler::set_priority(int sd, int prio)
}
}

int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock)
int NetHandler::generic_connect(const entity_addr_t& addr, const entity_addr_t &bind_addr, bool nonblock)
{
int ret;
int s = create_socket(addr.get_family());
Expand All @@ -167,6 +167,18 @@ int NetHandler::generic_connect(const entity_addr_t& addr, bool nonblock)

set_socket_options(s, cct->_conf->ms_tcp_nodelay, cct->_conf->ms_tcp_rcvbuf);

{
entity_addr_t addr = bind_addr;
if (cct->_conf->ms_bind_before_connect && (!addr.is_blank_ip())) {
addr.set_port(0);
ret = ::bind(s, addr.get_sockaddr(), addr.get_sockaddr_len());
if (ret < 0) {
ret = -errno;
ldout(cct, 2) << __func__ << " client bind error " << ", " << cpp_strerror(ret) << dendl;
return ret;
}
}
}

ret = ::connect(s, addr.get_sockaddr(), addr.get_sockaddr_len());
if (ret < 0) {
Expand Down Expand Up @@ -195,14 +207,14 @@ int NetHandler::reconnect(const entity_addr_t &addr, int sd)
return 0;
}

int NetHandler::connect(const entity_addr_t &addr)
int NetHandler::connect(const entity_addr_t &addr, const entity_addr_t& bind_addr)
{
return generic_connect(addr, false);
return generic_connect(addr, bind_addr, false);
}

int NetHandler::nonblock_connect(const entity_addr_t &addr)
int NetHandler::nonblock_connect(const entity_addr_t &addr, const entity_addr_t& bind_addr)
{
return generic_connect(addr, true);
return generic_connect(addr, bind_addr, true);
}


Expand Down
6 changes: 3 additions & 3 deletions src/msg/async/net_handler.h
Expand Up @@ -20,7 +20,7 @@

namespace ceph {
class NetHandler {
int generic_connect(const entity_addr_t& addr, bool nonblock);
int generic_connect(const entity_addr_t& addr, const entity_addr_t& bind_addr, bool nonblock);

CephContext *cct;
public:
Expand All @@ -29,7 +29,7 @@ namespace ceph {
int set_nonblock(int sd);
void set_close_on_exec(int sd);
int set_socket_options(int sd, bool nodelay, int size);
int connect(const entity_addr_t &addr);
int connect(const entity_addr_t &addr, const entity_addr_t& bind_addr);

/**
* Try to reconnect the socket.
Expand All @@ -39,7 +39,7 @@ namespace ceph {
* < 0 need to goto fail
*/
int reconnect(const entity_addr_t &addr, int sd);
int nonblock_connect(const entity_addr_t &addr);
int nonblock_connect(const entity_addr_t &addr, const entity_addr_t& bind_addr);
void set_priority(int sd, int priority);
};
}
Expand Down
12 changes: 12 additions & 0 deletions src/msg/simple/Pipe.cc
Expand Up @@ -989,6 +989,18 @@ int Pipe::connect()

set_socket_options();

{
entity_addr_t addr2bind = msgr->get_myaddr();
if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) {
addr2bind.set_port(0);
int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len());
if (r < 0) {
ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl;
goto fail;
}
}
}

// connect!
ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());
Expand Down
21 changes: 21 additions & 0 deletions src/msg/simple/SimpleMessenger.cc
Expand Up @@ -306,6 +306,27 @@ int SimpleMessenger::rebind(const set<int>& avoid_ports)
return accepter.rebind(avoid_ports);
}


int SimpleMessenger::client_bind(const entity_addr_t &bind_addr)
{
lock.Lock();
if (did_bind) {
assert(my_inst.addr == bind_addr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems that miss lock.Unlock() ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

return 0;
}
if (started) {
ldout(cct,10) << "rank.bind already started" << dendl;
lock.Unlock();
return -1;
}
ldout(cct,10) << "rank.bind " << bind_addr << dendl;
lock.Unlock();

set_myaddr(bind_addr);
return 0;
}


int SimpleMessenger::start()
{
lock.Lock();
Expand Down
1 change: 1 addition & 0 deletions src/msg/simple/SimpleMessenger.h
Expand Up @@ -115,6 +115,7 @@ class SimpleMessenger : public SimplePolicyMessenger {

int bind(const entity_addr_t& bind_addr);
int rebind(const set<int>& avoid_ports);
int client_bind(const entity_addr_t& bind_addr);

/** @} Configuration functions */

Expand Down
19 changes: 12 additions & 7 deletions src/osd/OSD.cc
Expand Up @@ -836,9 +836,9 @@ pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t f
release_map(next_map);
return ret;
}
ret.first = osd->hbclient_messenger->get_connection(next_map->get_hb_back_inst(peer));
ret.first = osd->hb_back_client_messenger->get_connection(next_map->get_hb_back_inst(peer));
if (next_map->get_hb_front_addr(peer) != entity_addr_t())
ret.second = osd->hbclient_messenger->get_connection(next_map->get_hb_front_inst(peer));
ret.second = osd->hb_front_client_messenger->get_connection(next_map->get_hb_front_inst(peer));
release_map(next_map);
return ret;
}
Expand Down Expand Up @@ -1636,7 +1636,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
int id,
Messenger *internal_messenger,
Messenger *external_messenger,
Messenger *hb_clientm,
Messenger *hb_client_front,
Messenger *hb_client_back,
Messenger *hb_front_serverm,
Messenger *hb_back_serverm,
Messenger *osdc_messenger,
Expand Down Expand Up @@ -1678,7 +1679,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false),
heartbeat_need_update(true),
hbclient_messenger(hb_clientm),
hb_front_client_messenger(hb_client_front),
hb_back_client_messenger(hb_client_back),
hb_front_server_messenger(hb_front_serverm),
hb_back_server_messenger(hb_back_serverm),
daily_loadavg(0.0),
Expand Down Expand Up @@ -2187,7 +2189,8 @@ int OSD::init()
client_messenger->add_dispatcher_head(this);
cluster_messenger->add_dispatcher_head(this);

hbclient_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_front_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);

Expand Down Expand Up @@ -2858,7 +2861,8 @@ int OSD::shutdown()
class_handler->shutdown();
client_messenger->shutdown();
cluster_messenger->shutdown();
hbclient_messenger->shutdown();
hb_front_client_messenger->shutdown();
hb_back_client_messenger->shutdown();
objecter_messenger->shutdown();
hb_front_server_messenger->shutdown();
hb_back_server_messenger->shutdown();
Expand Down Expand Up @@ -7212,7 +7216,8 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
<< " rebind hb_front_server_messenger failed" << dendl;
}

hbclient_messenger->mark_down_all();
hb_front_client_messenger->mark_down_all();
hb_back_client_messenger->mark_down_all();

reset_heartbeat_peers();
}
Expand Down