Skip to content

Commit

Permalink
msg/simple: wait dispatch_queue until all pipes closed
Browse files Browse the repository at this point in the history
Now we use dispatch_queue.wait to wait for SimpleMessenger shutdown,
but we need to ensure DispatchQueue can process event after Accepter down,
Otherwise accepter may continue to accept new connection which may queue
new item. so we can't rely on DispatchQueue now.

Introduce stop_cond and stop flag to indicate this function like
AsyncMessenger did

Fixes: http://tracker.ceph.com/issues/16472
Signed-off-by: Haomai Wang <haomai@xsky.com>
  • Loading branch information
yuyuyu101 committed Aug 17, 2016
1 parent e354918 commit 4ee47ab
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
26 changes: 18 additions & 8 deletions src/msg/simple/SimpleMessenger.cc
Expand Up @@ -87,10 +87,15 @@ int SimpleMessenger::shutdown()
{
ldout(cct,10) << "shutdown " << get_myaddr() << dendl;
mark_down_all();
dispatch_queue.shutdown();

// break ref cycles on the loopback connection
local_connection->set_priv(NULL);

lock.Lock();
stop_cond.Signal();
stopped = true;
lock.Unlock();

return 0;
}

Expand Down Expand Up @@ -312,6 +317,7 @@ int SimpleMessenger::start()

assert(!started);
started = true;
stopped = false;

if (!did_bind) {
my_inst.addr.nonce = nonce;
Expand Down Expand Up @@ -525,14 +531,10 @@ void SimpleMessenger::wait()
lock.Unlock();
return;
}
lock.Unlock();
if (!stopped)
stop_cond.Wait(lock);

if (dispatch_queue.is_started()) {
ldout(cct,10) << "wait: waiting for dispatch queue" << dendl;
dispatch_queue.wait();
dispatch_queue.discard_local();
ldout(cct,10) << "wait: dispatch queue is stopped" << dendl;
}
lock.Unlock();

// done! clean up.
if (did_bind) {
Expand All @@ -542,6 +544,14 @@ void SimpleMessenger::wait()
ldout(cct,20) << "wait: stopped accepter thread" << dendl;
}

dispatch_queue.shutdown();
if (dispatch_queue.is_started()) {
ldout(cct,10) << "wait: waiting for dispatch queue" << dendl;
dispatch_queue.wait();
dispatch_queue.discard_local();
ldout(cct,10) << "wait: dispatch queue is stopped" << dendl;
}

if (reaper_started) {
ldout(cct,20) << "wait: stopping reaper thread" << dendl;
lock.Lock();
Expand Down
3 changes: 3 additions & 0 deletions src/msg/simple/SimpleMessenger.h
Expand Up @@ -305,6 +305,9 @@ class SimpleMessenger : public SimplePolicyMessenger {
/// internal cluster protocol version, if any, for talking to entities of the same type.
int cluster_protocol;

Cond stop_cond;
bool stopped = true;

bool reaper_started, reaper_stop;
Cond reaper_cond;

Expand Down

0 comments on commit 4ee47ab

Please sign in to comment.