diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 36c8f2d7de10d2..af1747951c0c26 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -367,9 +367,9 @@ class AsyncConnection : public Connection { void process(); void wakeup_from(uint64_t id); void local_deliver(); - void stop() { + void stop(bool queue_reset) { lock.Lock(); - bool need_queue_reset = (state != STATE_CLOSED); + bool need_queue_reset = (state != STATE_CLOSED) && queue_reset; lock.Unlock(); mark_down(); if (need_queue_reset) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 52e6a1d95600eb..8c2f657b0a71d0 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -536,16 +536,17 @@ int AsyncMessenger::shutdown() { ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; - // break ref cycles on the loopback connection - processor.stop(); mark_down_all(); - dispatch_queue.shutdown(); + // break ref cycles on the loopback connection local_connection->set_priv(NULL); - pool->barrier(); + // done! clean up. + processor.stop(); + did_bind = false; lock.Lock(); stop_cond.Signal(); - lock.Unlock(); stopped = true; + lock.Unlock(); + pool->barrier(); return 0; } @@ -618,15 +619,7 @@ void AsyncMessenger::wait() lock.Unlock(); - // done! clean up. - ldout(cct,20) << __func__ << ": stopping processor thread" << dendl; - processor.stop(); - did_bind = false; - ldout(cct,20) << __func__ << ": stopped processor thread" << dendl; - - // close all connections - mark_down_all(); - + dispatch_queue.shutdown(); if (dispatch_queue.is_started()) { ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl; dispatch_queue.wait(); @@ -634,6 +627,9 @@ void AsyncMessenger::wait() ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl; } + // close all connections + shutdown_connections(false); + ldout(cct, 10) << __func__ << ": done." << dendl; ldout(cct, 1) << __func__ << " complete." << dendl; started = false; @@ -774,7 +770,7 @@ int AsyncMessenger::send_keepalive(Connection *con) return 0; } -void AsyncMessenger::mark_down_all() +void AsyncMessenger::shutdown_connections(bool queue_reset) { ldout(cct,1) << __func__ << " " << dendl; lock.Lock(); @@ -782,7 +778,7 @@ void AsyncMessenger::mark_down_all() q != accepting_conns.end(); ++q) { AsyncConnectionRef p = *q; ldout(cct, 5) << __func__ << " accepting_conn " << p.get() << dendl; - p->stop(); + p->stop(queue_reset); } accepting_conns.clear(); @@ -792,7 +788,7 @@ void AsyncMessenger::mark_down_all() ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl; conns.erase(it); p->get_perf_counter()->dec(l_msgr_active_connections); - p->stop(); + p->stop(queue_reset); } { diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index f96601207d5d0f..0b2872ad7f4af6 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -216,8 +216,10 @@ class AsyncMessenger : public SimplePolicyMessenger { ConnectionRef get_connection(const entity_inst_t& dest) override; ConnectionRef get_loopback_connection() override; int send_keepalive(Connection *con); - void mark_down(const entity_addr_t& addr) override; - void mark_down_all() override; + virtual void mark_down(const entity_addr_t& addr) override; + virtual void mark_down_all() override { + shutdown_connections(true); + } /** @} // Connection Management */ /** @@ -388,6 +390,8 @@ class AsyncMessenger : public SimplePolicyMessenger { ms_deliver_handle_fast_connect(local_connection.get()); } + void shutdown_connections(bool queue_reset); + public: /// con used for sending messages to ourselves