Skip to content

Commit

Permalink
Merge pull request #9783 from yuyuyu101/wip-16366
Browse files Browse the repository at this point in the history
msg/async: Support close idle connection feature
  • Loading branch information
yuyuyu101 committed Jun 29, 2016
2 parents 4c26fae + 0001a07 commit 593008e
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 238 deletions.
115 changes: 76 additions & 39 deletions src/msg/async/AsyncConnection.cc
Expand Up @@ -87,6 +87,16 @@ class C_clean_handler : public EventCallback {
}
};

class C_tick_wakeup : public EventCallback {
AsyncConnectionRef conn;

public:
explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {}
void do_request(int fd_or_id) {
conn->tick(fd_or_id);
}
};

static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
{
// create a buffer to read into that matches the data alignment
Expand All @@ -109,19 +119,24 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
}

AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
EventCenter *c, PerfCounters *p)
Worker *w)
: Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
logger(p), global_seq(0), connect_seq(0), peer_global_seq(0),
logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1),
dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false),
is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c)
recv_start(0), recv_end(0),
last_active(ceph::coarse_mono_clock::now()),
inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
got_bad_auth(false), authorizer(NULL), replacing(false),
is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct),
worker(w), center(&w->center)
{
read_handler = new C_handle_read(this);
write_handler = new C_handle_write(this);
wakeup_handler = new C_time_wakeup(this);
tick_handler = new C_tick_wakeup(this);
memset(msgvec, 0, sizeof(msgvec));
// double recv_max_prefetch see "read_until"
recv_buf = new char[2*recv_max_prefetch];
Expand Down Expand Up @@ -459,6 +474,7 @@ void AsyncConnection::process()
int prev_state = state;
bool already_dispatch_writer = false;
Mutex::Locker l(lock);
last_active = ceph::coarse_mono_clock::now();
do {
ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl;
prev_state = state;
Expand Down Expand Up @@ -1288,9 +1304,15 @@ ssize_t AsyncConnection::_process_connection()
session_security.reset();
}

if (delay_state)
assert(delay_state->ready());
dispatch_queue->queue_connect(this);
async_msgr->ms_deliver_handle_fast_connect(this);

// make sure no pending tick timer
center->delete_time_event(last_tick_id);
last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);

// message may in queue between last _try_send and connection ready
// write event may already notify and we need to force scheduler again
write_lock.Lock();
Expand Down Expand Up @@ -1458,6 +1480,13 @@ ssize_t AsyncConnection::_process_connection()
ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl;
state = STATE_OPEN;
memset(&connect_msg, 0, sizeof(connect_msg));

if (delay_state)
assert(delay_state->ready());
// make sure no pending tick timer
center->delete_time_event(last_tick_id);
last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);

write_lock.Lock();
can_write = WriteStatus::CANWRITE;
if (is_queued())
Expand Down Expand Up @@ -1798,17 +1827,24 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
assert(recv_start == recv_end);

existing->write_lock.Unlock();
if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
// handle error
ldout(async_msgr->cct, 0) << __func__ << " reply fault for existing connection." << dendl;
existing->fault();
}

ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
_stop();
// queue a reset on the new connection, which we're dumping for the old
dispatch_queue->queue_reset(this);
int new_fd = existing->sd;
center->submit_to(existing->center->get_id(), [existing, new_fd, connect, reply, authorizer_reply]() mutable {
Mutex::Locker l(existing->lock);
if (new_fd != existing->sd)
return ;

if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
// handle error
existing->fault();
}
}, true);
existing->lock.Unlock();

return 0;
}
existing->lock.Unlock();
Expand Down Expand Up @@ -2202,6 +2238,7 @@ void AsyncConnection::_stop()
dispatch_queue->discard_queue(conn_id);
discard_out_queue();
async_msgr->unregister_conn(this);
worker->release_worker();

state = STATE_CLOSED;
open_write = false;
Expand All @@ -2212,9 +2249,6 @@ void AsyncConnection::_stop()
::close(sd);
}
sd = -1;
for (set<uint64_t>::iterator it = register_time_events.begin();
it != register_time_events.end(); ++it)
center->delete_time_event(*it);
// Make sure in-queue events will been processed
center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));

Expand Down Expand Up @@ -2402,6 +2436,8 @@ void AsyncConnection::DelayedDelivery::do_request(int id)
{
Mutex::Locker l(delay_lock);
register_time_events.erase(id);
if (stop_dispatch)
return ;
if (delay_queue.empty())
return ;
utime_t release = delay_queue.front().first;
Expand All @@ -2422,16 +2458,11 @@ void AsyncConnection::DelayedDelivery::do_request(int id)
}
}

class C_flush_messages : public EventCallback {
std::deque<std::pair<utime_t, Message*> > delay_queue;
AsyncMessenger *msgr;
DispatchQueue *dispatch_queue;
uint64_t conn_id;
public:
C_flush_messages(std::deque<std::pair<utime_t, Message*> > &&q, AsyncMessenger *m,
DispatchQueue *dq, uint64_t cid)
: delay_queue(std::move(q)), msgr(m), dispatch_queue(dq), conn_id(cid) {}
void do_request(int id) {
void AsyncConnection::DelayedDelivery::flush() {
stop_dispatch = true;
center->submit_to(
center->get_id(), [this] () mutable {
Mutex::Locker l(delay_lock);
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
if (msgr->ms_can_fast_dispatch(m)) {
Expand All @@ -2441,17 +2472,11 @@ class C_flush_messages : public EventCallback {
}
delay_queue.pop_front();
}
delete this;
}
};

void AsyncConnection::DelayedDelivery::flush() {
Mutex::Locker l(delay_lock);
center->dispatch_event_external(
new C_flush_messages(std::move(delay_queue), msgr, dispatch_queue, conn_id));
for (auto i : register_time_events)
center->delete_time_event(i);
register_time_events.clear();
for (auto i : register_time_events)
center->delete_time_event(i);
register_time_events.clear();
stop_dispatch = false;
}, true);
}

void AsyncConnection::send_keepalive()
Expand All @@ -2471,12 +2496,6 @@ void AsyncConnection::mark_down()
_stop();
}

void AsyncConnection::release_worker()
{
if (msgr)
reinterpret_cast<AsyncMessenger*>(msgr)->release_worker(center);
}

void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
{
assert(write_lock.is_locked());
Expand Down Expand Up @@ -2587,3 +2606,21 @@ void AsyncConnection::wakeup_from(uint64_t id)
lock.Unlock();
process();
}

void AsyncConnection::tick(uint64_t id)
{
auto now = ceph::coarse_mono_clock::now();
ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id
<< " last_active" << last_active << dendl;
assert(last_tick_id == id);
Mutex::Locker l(lock);
auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count();
if (inactive_timeout_us < (uint64_t)idle_period) {
ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than "
<< inactive_timeout_us
<< " us, mark self fault." << dendl;
fault();
} else if (is_connected()) {
last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
}
}
46 changes: 32 additions & 14 deletions src/msg/async/AsyncConnection.h
Expand Up @@ -26,6 +26,7 @@
using namespace std;

#include "auth/AuthSessionHandler.h"
#include "common/ceph_time.h"
#include "common/Mutex.h"
#include "common/perf_counters.h"
#include "include/buffer.h"
Expand All @@ -36,6 +37,7 @@ using namespace std;
#include "net_handler.h"

class AsyncMessenger;
class Worker;

static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);

Expand Down Expand Up @@ -142,12 +144,14 @@ class AsyncConnection : public Connection {
EventCenter *center;
DispatchQueue *dispatch_queue;
uint64_t conn_id;
std::atomic_bool stop_dispatch;

public:
explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
DispatchQueue *q, uint64_t cid)
: delay_lock("AsyncConnection::DelayedDelivery::delay_lock"),
msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid) { }
msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
stop_dispatch(false) { }
~DelayedDelivery() {
assert(register_time_events.empty());
assert(delay_queue.empty());
Expand All @@ -159,22 +163,27 @@ class AsyncConnection : public Connection {
register_time_events.insert(center->create_time_event(delay_period*1000000, this));
}
void discard() {
Mutex::Locker l(delay_lock);
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
m->put();
delay_queue.pop_front();
}
for (auto i : register_time_events)
center->delete_time_event(i);
register_time_events.clear();
stop_dispatch = true;
center->submit_to(center->get_id(), [this] () mutable {
Mutex::Locker l(delay_lock);
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
m->put();
delay_queue.pop_front();
}
for (auto i : register_time_events)
center->delete_time_event(i);
register_time_events.clear();
stop_dispatch = false;
}, true);
}
bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
void flush();
} *delay_state;

public:
AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, EventCenter *c, PerfCounters *p);
AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w);
~AsyncConnection();
void maybe_start_delay_thread();

Expand Down Expand Up @@ -202,8 +211,6 @@ class AsyncConnection : public Connection {
policy.lossy = true;
}

void release_worker();

private:
enum {
STATE_NONE,
Expand Down Expand Up @@ -312,12 +319,16 @@ class AsyncConnection : public Connection {
EventCallbackRef read_handler;
EventCallbackRef write_handler;
EventCallbackRef wakeup_handler;
EventCallbackRef tick_handler;
struct iovec msgvec[ASYNC_IOV_MAX];
char *recv_buf;
uint32_t recv_max_prefetch;
uint32_t recv_start;
uint32_t recv_end;
set<uint64_t> register_time_events; // need to delete it if stop
ceph::coarse_mono_clock::time_point last_active;
uint64_t last_tick_id = 0;
const uint64_t inactive_timeout_us;

// Tis section are temp variables used by state transition

Expand Down Expand Up @@ -352,6 +363,7 @@ class AsyncConnection : public Connection {
// used only by "read_until"
uint64_t state_offset;
NetHandler net;
Worker *worker;
EventCenter *center;
ceph::shared_ptr<AuthSessionHandler> session_security;

Expand All @@ -366,6 +378,7 @@ class AsyncConnection : public Connection {
void handle_write();
void process();
void wakeup_from(uint64_t id);
void tick(uint64_t id);
void local_deliver();
void stop(bool queue_reset) {
lock.Lock();
Expand All @@ -376,9 +389,14 @@ class AsyncConnection : public Connection {
dispatch_queue->queue_reset(this);
}
void cleanup_handler() {
for (auto &&t : register_time_events)
center->delete_time_event(t);
register_time_events.clear();
center->delete_time_event(last_tick_id);
delete read_handler;
delete write_handler;
delete wakeup_handler;
delete tick_handler;
if (delay_state) {
delete delay_state;
delay_state = NULL;
Expand Down

0 comments on commit 593008e

Please sign in to comment.