Skip to content

Commit

Permalink
msg/async/AsyncMessenger: make sure all connections closed then shutd…
Browse files Browse the repository at this point in the history
…own dq

1. ensure stop accepter before shutdown dispatcherqueue
2. ensure we don't generate new item after dispatcher queue shutdown

Signed-off-by: Haomai Wang <haomai@xsky.com>
  • Loading branch information
yuyuyu101 committed Jun 28, 2016
1 parent 820e062 commit 2f4b152
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 21 deletions.
4 changes: 2 additions & 2 deletions src/msg/async/AsyncConnection.h
Expand Up @@ -367,9 +367,9 @@ class AsyncConnection : public Connection {
void process();
void wakeup_from(uint64_t id);
void local_deliver();
void stop() {
void stop(bool queue_reset) {
lock.Lock();
bool need_queue_reset = (state != STATE_CLOSED);
bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
lock.Unlock();
mark_down();
if (need_queue_reset)
Expand Down
30 changes: 13 additions & 17 deletions src/msg/async/AsyncMessenger.cc
Expand Up @@ -536,16 +536,17 @@ int AsyncMessenger::shutdown()
{
ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;

// break ref cycles on the loopback connection
processor.stop();
mark_down_all();
dispatch_queue.shutdown();
// break ref cycles on the loopback connection
local_connection->set_priv(NULL);
pool->barrier();
// done! clean up.
processor.stop();
did_bind = false;
lock.Lock();
stop_cond.Signal();
lock.Unlock();
stopped = true;
lock.Unlock();
pool->barrier();
return 0;
}

Expand Down Expand Up @@ -618,22 +619,17 @@ void AsyncMessenger::wait()

lock.Unlock();

// done! clean up.
ldout(cct,20) << __func__ << ": stopping processor thread" << dendl;
processor.stop();
did_bind = false;
ldout(cct,20) << __func__ << ": stopped processor thread" << dendl;

// close all connections
mark_down_all();

dispatch_queue.shutdown();
if (dispatch_queue.is_started()) {
ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl;
dispatch_queue.wait();
dispatch_queue.discard_local();
ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl;
}

// close all connections
shutdown_connections(false);

ldout(cct, 10) << __func__ << ": done." << dendl;
ldout(cct, 1) << __func__ << " complete." << dendl;
started = false;
Expand Down Expand Up @@ -774,15 +770,15 @@ int AsyncMessenger::send_keepalive(Connection *con)
return 0;
}

void AsyncMessenger::mark_down_all()
void AsyncMessenger::shutdown_connections(bool queue_reset)
{
ldout(cct,1) << __func__ << " " << dendl;
lock.Lock();
for (set<AsyncConnectionRef>::iterator q = accepting_conns.begin();
q != accepting_conns.end(); ++q) {
AsyncConnectionRef p = *q;
ldout(cct, 5) << __func__ << " accepting_conn " << p.get() << dendl;
p->stop();
p->stop(queue_reset);
}
accepting_conns.clear();

Expand All @@ -792,7 +788,7 @@ void AsyncMessenger::mark_down_all()
ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
conns.erase(it);
p->get_perf_counter()->dec(l_msgr_active_connections);
p->stop();
p->stop(queue_reset);
}

{
Expand Down
8 changes: 6 additions & 2 deletions src/msg/async/AsyncMessenger.h
Expand Up @@ -216,8 +216,10 @@ class AsyncMessenger : public SimplePolicyMessenger {
ConnectionRef get_connection(const entity_inst_t& dest) override;
ConnectionRef get_loopback_connection() override;
int send_keepalive(Connection *con);
void mark_down(const entity_addr_t& addr) override;
void mark_down_all() override;
virtual void mark_down(const entity_addr_t& addr) override;
virtual void mark_down_all() override {
shutdown_connections(true);
}
/** @} // Connection Management */

/**
Expand Down Expand Up @@ -388,6 +390,8 @@ class AsyncMessenger : public SimplePolicyMessenger {
ms_deliver_handle_fast_connect(local_connection.get());
}

void shutdown_connections(bool queue_reset);

public:

/// con used for sending messages to ourselves
Expand Down

0 comments on commit 2f4b152

Please sign in to comment.