diff --git a/src/msg/simple/SimpleMessenger.cc b/src/msg/simple/SimpleMessenger.cc index 604b3967295dd..4ef2fe1392335 100644 --- a/src/msg/simple/SimpleMessenger.cc +++ b/src/msg/simple/SimpleMessenger.cc @@ -87,10 +87,15 @@ int SimpleMessenger::shutdown() { ldout(cct,10) << "shutdown " << get_myaddr() << dendl; mark_down_all(); - dispatch_queue.shutdown(); // break ref cycles on the loopback connection local_connection->set_priv(NULL); + + lock.Lock(); + stop_cond.Signal(); + stopped = true; + lock.Unlock(); + return 0; } @@ -312,6 +317,7 @@ int SimpleMessenger::start() assert(!started); started = true; + stopped = false; if (!did_bind) { my_inst.addr.nonce = nonce; @@ -525,14 +531,10 @@ void SimpleMessenger::wait() lock.Unlock(); return; } - lock.Unlock(); + if (!stopped) + stop_cond.Wait(lock); - if (dispatch_queue.is_started()) { - ldout(cct,10) << "wait: waiting for dispatch queue" << dendl; - dispatch_queue.wait(); - dispatch_queue.discard_local(); - ldout(cct,10) << "wait: dispatch queue is stopped" << dendl; - } + lock.Unlock(); // done! clean up. if (did_bind) { @@ -542,6 +544,14 @@ void SimpleMessenger::wait() ldout(cct,20) << "wait: stopped accepter thread" << dendl; } + dispatch_queue.shutdown(); + if (dispatch_queue.is_started()) { + ldout(cct,10) << "wait: waiting for dispatch queue" << dendl; + dispatch_queue.wait(); + dispatch_queue.discard_local(); + ldout(cct,10) << "wait: dispatch queue is stopped" << dendl; + } + if (reaper_started) { ldout(cct,20) << "wait: stopping reaper thread" << dendl; lock.Lock(); diff --git a/src/msg/simple/SimpleMessenger.h b/src/msg/simple/SimpleMessenger.h index 3dbdad0f8c635..2f4685bedc404 100644 --- a/src/msg/simple/SimpleMessenger.h +++ b/src/msg/simple/SimpleMessenger.h @@ -305,6 +305,9 @@ class SimpleMessenger : public SimplePolicyMessenger { /// internal cluster protocol version, if any, for talking to entities of the same type. int cluster_protocol; + Cond stop_cond; + bool stopped = true; + bool reaper_started, reaper_stop; Cond reaper_cond;