Skip to content

Commit

Permalink
msg/async: set nonce before starting the workers
Browse files Browse the repository at this point in the history
otherwise workers will respond with difference nonces to peers.
and remove nonce from Processor. as there is only one nonce for each
Messenger at a given time.

Signed-off-by: Kefu Chai <kchai@redhat.com>
  • Loading branch information
tchaikov committed Dec 8, 2016
1 parent 7f5685e commit aac1a3e
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 53 deletions.
103 changes: 53 additions & 50 deletions src/msg/async/AsyncMessenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ class Processor::C_processor_accept : public EventCallback {
};

Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c, uint64_t n)
: msgr(r), net(c), worker(w), nonce(n),
: msgr(r), net(c), worker(w),
listen_handler(new C_processor_accept(this)) {}

int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
int Processor::bind(const entity_addr_t &bind_addr,
const set<int>& avoid_ports,
entity_addr_t* bound_addr)
{
const md_config_t *conf = msgr->cct->_conf;
// bind to a socket
Expand Down Expand Up @@ -137,42 +139,10 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
}

ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl;

msgr->set_myaddr(bind_addr);
if (bind_addr != entity_addr_t())
msgr->learned_addr(bind_addr);

if (msgr->get_myaddr().get_port() == 0) {
msgr->set_myaddr(listen_addr);
}
entity_addr_t addr = msgr->get_myaddr();
addr.nonce = nonce;
msgr->set_myaddr(addr);

msgr->init_local_connection();

ldout(msgr->cct,1) << __func__ << " bind my_inst.addr is " << msgr->get_myaddr() << dendl;
*bound_addr = listen_addr;
return 0;
}

int Processor::rebind(const set<int>& avoid_ports)
{
ldout(msgr->cct, 1) << __func__ << " rebind avoid " << avoid_ports << dendl;

entity_addr_t addr = msgr->get_myaddr();
set<int> new_avoid = avoid_ports;
new_avoid.insert(addr.get_port());
addr.set_port(0);

// adjust the nonce; we want our entity_addr_t to be truly unique.
nonce += 1000000;
msgr->my_inst.addr.nonce = nonce;
ldout(msgr->cct, 10) << __func__ << " new nonce " << nonce << " and inst " << msgr->my_inst << dendl;

ldout(msgr->cct, 10) << __func__ << " will try " << addr << " and avoid ports " << new_avoid << dendl;
return bind(addr, new_avoid);
}

void Processor::start()
{
ldout(msgr->cct, 1) << __func__ << dendl;
Expand Down Expand Up @@ -346,11 +316,11 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr)

// bind to a socket
set<int> avoid_ports;
int r = 0;
entity_addr_t bound_addr;
unsigned i = 0;
for (auto &&p : processors) {
r = p->bind(bind_addr, avoid_ports);
if (r < 0) {
int r = p->bind(bind_addr, avoid_ports, &bound_addr);
if (r) {
// Note: this is related to local tcp listen table problem.
// Posix(default kernel implementation) backend shares listen table
// in the kernel, so all threads can use the same listen table naturally
Expand All @@ -361,13 +331,12 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr)
// but the second worker failed, it's not expected and we need to assert
// here
assert(i == 0);
break;
return r;
}
++i;
}
if (r >= 0)
did_bind = true;
return r;
_finish_bind(bind_addr, bound_addr);
return 0;
}

int AsyncMessenger::rebind(const set<int>& avoid_ports)
Expand All @@ -378,19 +347,53 @@ int AsyncMessenger::rebind(const set<int>& avoid_ports)
for (auto &&p : processors)
p->stop();
mark_down_all();

// adjust the nonce; we want our entity_addr_t to be truly unique.
nonce += 1000000;
ldout(cct, 10) << __func__ << " new nonce " << nonce
<< " and inst " << get_myinst() << dendl;

entity_addr_t bound_addr;
entity_addr_t bind_addr = get_myaddr();
bind_addr.set_port(0);
set<int> new_avoid(avoid_ports);
new_avoid.insert(bind_addr.get_port());
ldout(cct, 10) << __func__ << " will try " << bind_addr
<< " and avoid ports " << new_avoid << dendl;
unsigned i = 0;
int r = 0;
for (auto &&p : processors) {
r = p->rebind(avoid_ports);
if (r == 0) {
p->start();
} else {
int r = p->bind(bind_addr, avoid_ports, &bound_addr);
if (r) {
assert(i == 0);
break;
return r;
}
i++;
++i;
}
_finish_bind(bind_addr, bound_addr);
for (auto &&p : processors) {
p->start();
}
return 0;
}

void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr,
const entity_addr_t& listen_addr)
{
set_myaddr(bind_addr);
if (bind_addr != entity_addr_t())
learned_addr(bind_addr);

if (get_myaddr().get_port() == 0) {
set_myaddr(listen_addr);
}
return r;
entity_addr_t addr = get_myaddr();
addr.set_nonce(nonce);
set_myaddr(addr);

init_local_connection();

ldout(cct,1) << __func__ << " bind my_inst.addr is " << get_myaddr() << dendl;
did_bind = true;
}

int AsyncMessenger::start()
Expand Down
8 changes: 5 additions & 3 deletions src/msg/async/AsyncMessenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class Processor {
NetHandler net;
Worker *worker;
ServerSocket listen_socket;
uint64_t nonce;
EventCallbackRef listen_handler;

class C_processor_accept;
Expand All @@ -58,8 +57,9 @@ class Processor {
~Processor() { delete listen_handler; };

void stop();
int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);
int rebind(const set<int>& avoid_port);
int bind(const entity_addr_t &bind_addr,
const set<int>& avoid_ports,
entity_addr_t* bound_addr);
void start();
void accept();
};
Expand Down Expand Up @@ -210,6 +210,8 @@ class AsyncMessenger : public SimplePolicyMessenger {
const entity_addr_t& dest_addr, int dest_type);

int _send_message(Message *m, const entity_inst_t& dest);
void _finish_bind(const entity_addr_t& bind_addr,
const entity_addr_t& listen_addr);

private:
static const uint64_t ReapDeadConnectionThreshold = 5;
Expand Down

0 comments on commit aac1a3e

Please sign in to comment.