Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

msg/async: Support close idle connection feature #9783

Merged
merged 28 commits into from Jun 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8f0cd8e
Event: use multimap instead of map
yuyuyu101 May 24, 2016
d493b6b
Event: remove clock skew detect since we use mono clock now
yuyuyu101 May 24, 2016
7ca8839
Event: use local_center instead of pthread id
yuyuyu101 May 24, 2016
ba470ce
Event: remove time_lock which is no need
yuyuyu101 Mar 8, 2016
e515c9d
Event: replace atomic_t with c++ atomic
yuyuyu101 Mar 8, 2016
eebc6c2
Event: replace Mutex with c++ mutex
yuyuyu101 Mar 8, 2016
018b48c
Event: remove extra header files
yuyuyu101 May 24, 2016
b58f6df
Event: remove next_time which isn't needed
yuyuyu101 May 24, 2016
6e24d9a
Event: implement effective delete_time_event
yuyuyu101 May 24, 2016
fe6564c
AsyncConnection: make connection timeout when idle a lot
yuyuyu101 May 24, 2016
dd9c43a
test_async_driver: add missing headers
yuyuyu101 May 24, 2016
fa4767c
AsyncConnection: add tick timer
yuyuyu101 May 24, 2016
9d952ad
test_msgr: add connection read timeout tests
yuyuyu101 May 24, 2016
761f147
AsyncConnection: move delete timers to cleanup handler
yuyuyu101 May 25, 2016
4dc2e13
AsyncConnection: add EventCenter id
yuyuyu101 May 25, 2016
c33753e
Event: add submit_to apis
yuyuyu101 May 25, 2016
ef7c3e3
AsyncConnection: execute existing fault in another
yuyuyu101 May 25, 2016
cb58088
Event: change id to idx
yuyuyu101 May 25, 2016
65874f1
perf_local: fix api adjust
yuyuyu101 May 25, 2016
fb480fd
AsyncConnection: make delete_time_event async
yuyuyu101 May 25, 2016
8256211
AsyncConnection: ensure delay_state empty when building a new session
yuyuyu101 May 26, 2016
8907f52
AsyncMessenger: remove extra release_worker path
yuyuyu101 May 27, 2016
84e19b5
Event: avoid multi global conflict
yuyuyu101 Jun 4, 2016
4b5bb65
Event: change submit_to related to cephcontext
yuyuyu101 May 28, 2016
1f65a35
msg/async/Event: don't execute inline if nonwait
yuyuyu101 Jun 19, 2016
be8dec5
msg/async/AsyncConnection: avoid dup RETRYGLOBAL sending
yuyuyu101 Jun 26, 2016
ada84e7
msg/async/Event: change to use pthread_t to indicate whether the same…
yuyuyu101 Jun 28, 2016
0001a07
Revert "msg/AsyncMessenger: move Worker class to cc file"
yuyuyu101 Jun 28, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
115 changes: 76 additions & 39 deletions src/msg/async/AsyncConnection.cc
Expand Up @@ -87,6 +87,16 @@ class C_clean_handler : public EventCallback {
}
};

class C_tick_wakeup : public EventCallback {
AsyncConnectionRef conn;

public:
explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {}
void do_request(int fd_or_id) {
conn->tick(fd_or_id);
}
};

static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
{
// create a buffer to read into that matches the data alignment
Expand All @@ -109,19 +119,24 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
}

AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
EventCenter *c, PerfCounters *p)
Worker *w)
: Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
logger(p), global_seq(0), connect_seq(0), peer_global_seq(0),
logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1),
dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false),
is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c)
recv_start(0), recv_end(0),
last_active(ceph::coarse_mono_clock::now()),
inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
got_bad_auth(false), authorizer(NULL), replacing(false),
is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct),
worker(w), center(&w->center)
{
read_handler = new C_handle_read(this);
write_handler = new C_handle_write(this);
wakeup_handler = new C_time_wakeup(this);
tick_handler = new C_tick_wakeup(this);
memset(msgvec, 0, sizeof(msgvec));
// double recv_max_prefetch see "read_until"
recv_buf = new char[2*recv_max_prefetch];
Expand Down Expand Up @@ -459,6 +474,7 @@ void AsyncConnection::process()
int prev_state = state;
bool already_dispatch_writer = false;
Mutex::Locker l(lock);
last_active = ceph::coarse_mono_clock::now();
do {
ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl;
prev_state = state;
Expand Down Expand Up @@ -1288,9 +1304,15 @@ ssize_t AsyncConnection::_process_connection()
session_security.reset();
}

if (delay_state)
assert(delay_state->ready());
dispatch_queue->queue_connect(this);
async_msgr->ms_deliver_handle_fast_connect(this);

// make sure no pending tick timer
center->delete_time_event(last_tick_id);
last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);

// message may in queue between last _try_send and connection ready
// write event may already notify and we need to force scheduler again
write_lock.Lock();
Expand Down Expand Up @@ -1458,6 +1480,13 @@ ssize_t AsyncConnection::_process_connection()
ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl;
state = STATE_OPEN;
memset(&connect_msg, 0, sizeof(connect_msg));

if (delay_state)
assert(delay_state->ready());
// make sure no pending tick timer
center->delete_time_event(last_tick_id);
last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);

write_lock.Lock();
can_write = WriteStatus::CANWRITE;
if (is_queued())
Expand Down Expand Up @@ -1798,17 +1827,24 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
assert(recv_start == recv_end);

existing->write_lock.Unlock();
if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
// handle error
ldout(async_msgr->cct, 0) << __func__ << " reply fault for existing connection." << dendl;
existing->fault();
}

ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
_stop();
// queue a reset on the new connection, which we're dumping for the old
dispatch_queue->queue_reset(this);
int new_fd = existing->sd;
center->submit_to(existing->center->get_id(), [existing, new_fd, connect, reply, authorizer_reply]() mutable {
Mutex::Locker l(existing->lock);
if (new_fd != existing->sd)
return ;

if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
// handle error
existing->fault();
}
}, true);
existing->lock.Unlock();

return 0;
}
existing->lock.Unlock();
Expand Down Expand Up @@ -2202,6 +2238,7 @@ void AsyncConnection::_stop()
dispatch_queue->discard_queue(conn_id);
discard_out_queue();
async_msgr->unregister_conn(this);
worker->release_worker();

state = STATE_CLOSED;
open_write = false;
Expand All @@ -2212,9 +2249,6 @@ void AsyncConnection::_stop()
::close(sd);
}
sd = -1;
for (set<uint64_t>::iterator it = register_time_events.begin();
it != register_time_events.end(); ++it)
center->delete_time_event(*it);
// Make sure in-queue events will been processed
center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));

Expand Down Expand Up @@ -2402,6 +2436,8 @@ void AsyncConnection::DelayedDelivery::do_request(int id)
{
Mutex::Locker l(delay_lock);
register_time_events.erase(id);
if (stop_dispatch)
return ;
if (delay_queue.empty())
return ;
utime_t release = delay_queue.front().first;
Expand All @@ -2422,16 +2458,11 @@ void AsyncConnection::DelayedDelivery::do_request(int id)
}
}

class C_flush_messages : public EventCallback {
std::deque<std::pair<utime_t, Message*> > delay_queue;
AsyncMessenger *msgr;
DispatchQueue *dispatch_queue;
uint64_t conn_id;
public:
C_flush_messages(std::deque<std::pair<utime_t, Message*> > &&q, AsyncMessenger *m,
DispatchQueue *dq, uint64_t cid)
: delay_queue(std::move(q)), msgr(m), dispatch_queue(dq), conn_id(cid) {}
void do_request(int id) {
void AsyncConnection::DelayedDelivery::flush() {
stop_dispatch = true;
center->submit_to(
center->get_id(), [this] () mutable {
Mutex::Locker l(delay_lock);
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
if (msgr->ms_can_fast_dispatch(m)) {
Expand All @@ -2441,17 +2472,11 @@ class C_flush_messages : public EventCallback {
}
delay_queue.pop_front();
}
delete this;
}
};

void AsyncConnection::DelayedDelivery::flush() {
Mutex::Locker l(delay_lock);
center->dispatch_event_external(
new C_flush_messages(std::move(delay_queue), msgr, dispatch_queue, conn_id));
for (auto i : register_time_events)
center->delete_time_event(i);
register_time_events.clear();
for (auto i : register_time_events)
center->delete_time_event(i);
register_time_events.clear();
stop_dispatch = false;
}, true);
}

void AsyncConnection::send_keepalive()
Expand All @@ -2471,12 +2496,6 @@ void AsyncConnection::mark_down()
_stop();
}

void AsyncConnection::release_worker()
{
if (msgr)
reinterpret_cast<AsyncMessenger*>(msgr)->release_worker(center);
}

void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
{
assert(write_lock.is_locked());
Expand Down Expand Up @@ -2587,3 +2606,21 @@ void AsyncConnection::wakeup_from(uint64_t id)
lock.Unlock();
process();
}

void AsyncConnection::tick(uint64_t id)
{
auto now = ceph::coarse_mono_clock::now();
ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id
<< " last_active" << last_active << dendl;
assert(last_tick_id == id);
Mutex::Locker l(lock);
auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count();
if (inactive_timeout_us < (uint64_t)idle_period) {
ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than "
<< inactive_timeout_us
<< " us, mark self fault." << dendl;
fault();
} else if (is_connected()) {
last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
}
}
46 changes: 32 additions & 14 deletions src/msg/async/AsyncConnection.h
Expand Up @@ -26,6 +26,7 @@
using namespace std;

#include "auth/AuthSessionHandler.h"
#include "common/ceph_time.h"
#include "common/Mutex.h"
#include "common/perf_counters.h"
#include "include/buffer.h"
Expand All @@ -36,6 +37,7 @@ using namespace std;
#include "net_handler.h"

class AsyncMessenger;
class Worker;

static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);

Expand Down Expand Up @@ -142,12 +144,14 @@ class AsyncConnection : public Connection {
EventCenter *center;
DispatchQueue *dispatch_queue;
uint64_t conn_id;
std::atomic_bool stop_dispatch;

public:
explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
DispatchQueue *q, uint64_t cid)
: delay_lock("AsyncConnection::DelayedDelivery::delay_lock"),
msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid) { }
msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
stop_dispatch(false) { }
~DelayedDelivery() {
assert(register_time_events.empty());
assert(delay_queue.empty());
Expand All @@ -159,22 +163,27 @@ class AsyncConnection : public Connection {
register_time_events.insert(center->create_time_event(delay_period*1000000, this));
}
void discard() {
Mutex::Locker l(delay_lock);
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
m->put();
delay_queue.pop_front();
}
for (auto i : register_time_events)
center->delete_time_event(i);
register_time_events.clear();
stop_dispatch = true;
center->submit_to(center->get_id(), [this] () mutable {
Mutex::Locker l(delay_lock);
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
m->put();
delay_queue.pop_front();
}
for (auto i : register_time_events)
center->delete_time_event(i);
register_time_events.clear();
stop_dispatch = false;
}, true);
}
bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
void flush();
} *delay_state;

public:
AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, EventCenter *c, PerfCounters *p);
AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w);
~AsyncConnection();
void maybe_start_delay_thread();

Expand Down Expand Up @@ -202,8 +211,6 @@ class AsyncConnection : public Connection {
policy.lossy = true;
}

void release_worker();

private:
enum {
STATE_NONE,
Expand Down Expand Up @@ -312,12 +319,16 @@ class AsyncConnection : public Connection {
EventCallbackRef read_handler;
EventCallbackRef write_handler;
EventCallbackRef wakeup_handler;
EventCallbackRef tick_handler;
struct iovec msgvec[ASYNC_IOV_MAX];
char *recv_buf;
uint32_t recv_max_prefetch;
uint32_t recv_start;
uint32_t recv_end;
set<uint64_t> register_time_events; // need to delete it if stop
ceph::coarse_mono_clock::time_point last_active;
uint64_t last_tick_id = 0;
const uint64_t inactive_timeout_us;

// Tis section are temp variables used by state transition

Expand Down Expand Up @@ -352,6 +363,7 @@ class AsyncConnection : public Connection {
// used only by "read_until"
uint64_t state_offset;
NetHandler net;
Worker *worker;
EventCenter *center;
ceph::shared_ptr<AuthSessionHandler> session_security;

Expand All @@ -366,6 +378,7 @@ class AsyncConnection : public Connection {
void handle_write();
void process();
void wakeup_from(uint64_t id);
void tick(uint64_t id);
void local_deliver();
void stop(bool queue_reset) {
lock.Lock();
Expand All @@ -376,9 +389,14 @@ class AsyncConnection : public Connection {
dispatch_queue->queue_reset(this);
}
void cleanup_handler() {
for (auto &&t : register_time_events)
center->delete_time_event(t);
register_time_events.clear();
center->delete_time_event(last_tick_id);
delete read_handler;
delete write_handler;
delete wakeup_handler;
delete tick_handler;
if (delay_state) {
delete delay_state;
delay_state = NULL;
Expand Down