diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 00f7d42912efd..6f7a38d0acc1a 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -87,6 +87,16 @@ class C_clean_handler : public EventCallback { } }; +class C_tick_wakeup : public EventCallback { + AsyncConnectionRef conn; + + public: + explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {} + void do_request(int fd_or_id) { + conn->tick(fd_or_id); + } +}; + static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) { // create a buffer to read into that matches the data alignment @@ -109,19 +119,24 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) } AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, - EventCenter *c, PerfCounters *p) + Worker *w) : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), - logger(p), global_seq(0), connect_seq(0), peer_global_seq(0), + logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0), out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1), dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE), open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL), recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), - recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false), - is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c) + recv_start(0), recv_end(0), + last_active(ceph::coarse_mono_clock::now()), + inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000), + got_bad_auth(false), authorizer(NULL), replacing(false), + is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), + worker(w), center(&w->center) { read_handler = new C_handle_read(this); write_handler = new C_handle_write(this); wakeup_handler = new C_time_wakeup(this); + tick_handler = new C_tick_wakeup(this); memset(msgvec, 0, sizeof(msgvec)); // double recv_max_prefetch see "read_until" recv_buf = new char[2*recv_max_prefetch]; @@ -459,6 +474,7 @@ void AsyncConnection::process() int prev_state = state; bool already_dispatch_writer = false; Mutex::Locker l(lock); + last_active = ceph::coarse_mono_clock::now(); do { ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl; prev_state = state; @@ -1288,9 +1304,15 @@ ssize_t AsyncConnection::_process_connection() session_security.reset(); } + if (delay_state) + assert(delay_state->ready()); dispatch_queue->queue_connect(this); async_msgr->ms_deliver_handle_fast_connect(this); + // make sure no pending tick timer + center->delete_time_event(last_tick_id); + last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); + // message may in queue between last _try_send and connection ready // write event may already notify and we need to force scheduler again write_lock.Lock(); @@ -1458,6 +1480,13 @@ ssize_t AsyncConnection::_process_connection() ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl; state = STATE_OPEN; memset(&connect_msg, 0, sizeof(connect_msg)); + + if (delay_state) + assert(delay_state->ready()); + // make sure no pending tick timer + center->delete_time_event(last_tick_id); + last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); + write_lock.Lock(); can_write = WriteStatus::CANWRITE; if (is_queued()) @@ -1798,17 +1827,24 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis assert(recv_start == recv_end); existing->write_lock.Unlock(); - if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) { - // handle error - ldout(async_msgr->cct, 0) << __func__ << " reply fault for existing connection." << dendl; - existing->fault(); - } ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl; _stop(); // queue a reset on the new connection, which we're dumping for the old dispatch_queue->queue_reset(this); + int new_fd = existing->sd; + center->submit_to(existing->center->get_id(), [existing, new_fd, connect, reply, authorizer_reply]() mutable { + Mutex::Locker l(existing->lock); + if (new_fd != existing->sd) + return ; + + if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) { + // handle error + existing->fault(); + } + }, true); existing->lock.Unlock(); + return 0; } existing->lock.Unlock(); @@ -2202,6 +2238,7 @@ void AsyncConnection::_stop() dispatch_queue->discard_queue(conn_id); discard_out_queue(); async_msgr->unregister_conn(this); + worker->release_worker(); state = STATE_CLOSED; open_write = false; @@ -2212,9 +2249,6 @@ void AsyncConnection::_stop() ::close(sd); } sd = -1; - for (set::iterator it = register_time_events.begin(); - it != register_time_events.end(); ++it) - center->delete_time_event(*it); // Make sure in-queue events will been processed center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this))); @@ -2402,6 +2436,8 @@ void AsyncConnection::DelayedDelivery::do_request(int id) { Mutex::Locker l(delay_lock); register_time_events.erase(id); + if (stop_dispatch) + return ; if (delay_queue.empty()) return ; utime_t release = delay_queue.front().first; @@ -2422,16 +2458,11 @@ void AsyncConnection::DelayedDelivery::do_request(int id) } } -class C_flush_messages : public EventCallback { - std::deque > delay_queue; - AsyncMessenger *msgr; - DispatchQueue *dispatch_queue; - uint64_t conn_id; - public: - C_flush_messages(std::deque > &&q, AsyncMessenger *m, - DispatchQueue *dq, uint64_t cid) - : delay_queue(std::move(q)), msgr(m), dispatch_queue(dq), conn_id(cid) {} - void do_request(int id) { +void AsyncConnection::DelayedDelivery::flush() { + stop_dispatch = true; + center->submit_to( + center->get_id(), [this] () mutable { + Mutex::Locker l(delay_lock); while (!delay_queue.empty()) { Message *m = delay_queue.front().second; if (msgr->ms_can_fast_dispatch(m)) { @@ -2441,17 +2472,11 @@ class C_flush_messages : public EventCallback { } delay_queue.pop_front(); } - delete this; - } -}; - -void AsyncConnection::DelayedDelivery::flush() { - Mutex::Locker l(delay_lock); - center->dispatch_event_external( - new C_flush_messages(std::move(delay_queue), msgr, dispatch_queue, conn_id)); - for (auto i : register_time_events) - center->delete_time_event(i); - register_time_events.clear(); + for (auto i : register_time_events) + center->delete_time_event(i); + register_time_events.clear(); + stop_dispatch = false; + }, true); } void AsyncConnection::send_keepalive() @@ -2471,12 +2496,6 @@ void AsyncConnection::mark_down() _stop(); } -void AsyncConnection::release_worker() -{ - if (msgr) - reinterpret_cast(msgr)->release_worker(center); -} - void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) { assert(write_lock.is_locked()); @@ -2587,3 +2606,21 @@ void AsyncConnection::wakeup_from(uint64_t id) lock.Unlock(); process(); } + +void AsyncConnection::tick(uint64_t id) +{ + auto now = ceph::coarse_mono_clock::now(); + ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id + << " last_active" << last_active << dendl; + assert(last_tick_id == id); + Mutex::Locker l(lock); + auto idle_period = std::chrono::duration_cast(now - last_active).count(); + if (inactive_timeout_us < (uint64_t)idle_period) { + ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than " + << inactive_timeout_us + << " us, mark self fault." << dendl; + fault(); + } else if (is_connected()) { + last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); + } +} diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index af1747951c0c2..5769827b9d720 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -26,6 +26,7 @@ using namespace std; #include "auth/AuthSessionHandler.h" +#include "common/ceph_time.h" #include "common/Mutex.h" #include "common/perf_counters.h" #include "include/buffer.h" @@ -36,6 +37,7 @@ using namespace std; #include "net_handler.h" class AsyncMessenger; +class Worker; static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); @@ -142,12 +144,14 @@ class AsyncConnection : public Connection { EventCenter *center; DispatchQueue *dispatch_queue; uint64_t conn_id; + std::atomic_bool stop_dispatch; public: explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c, DispatchQueue *q, uint64_t cid) : delay_lock("AsyncConnection::DelayedDelivery::delay_lock"), - msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid) { } + msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid), + stop_dispatch(false) { } ~DelayedDelivery() { assert(register_time_events.empty()); assert(delay_queue.empty()); @@ -159,22 +163,27 @@ class AsyncConnection : public Connection { register_time_events.insert(center->create_time_event(delay_period*1000000, this)); } void discard() { - Mutex::Locker l(delay_lock); - while (!delay_queue.empty()) { - Message *m = delay_queue.front().second; - dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size()); - m->put(); - delay_queue.pop_front(); - } - for (auto i : register_time_events) - center->delete_time_event(i); - register_time_events.clear(); + stop_dispatch = true; + center->submit_to(center->get_id(), [this] () mutable { + Mutex::Locker l(delay_lock); + while (!delay_queue.empty()) { + Message *m = delay_queue.front().second; + dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size()); + m->put(); + delay_queue.pop_front(); + } + for (auto i : register_time_events) + center->delete_time_event(i); + register_time_events.clear(); + stop_dispatch = false; + }, true); } + bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); } void flush(); } *delay_state; public: - AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, EventCenter *c, PerfCounters *p); + AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w); ~AsyncConnection(); void maybe_start_delay_thread(); @@ -202,8 +211,6 @@ class AsyncConnection : public Connection { policy.lossy = true; } - void release_worker(); - private: enum { STATE_NONE, @@ -312,12 +319,16 @@ class AsyncConnection : public Connection { EventCallbackRef read_handler; EventCallbackRef write_handler; EventCallbackRef wakeup_handler; + EventCallbackRef tick_handler; struct iovec msgvec[ASYNC_IOV_MAX]; char *recv_buf; uint32_t recv_max_prefetch; uint32_t recv_start; uint32_t recv_end; set register_time_events; // need to delete it if stop + ceph::coarse_mono_clock::time_point last_active; + uint64_t last_tick_id = 0; + const uint64_t inactive_timeout_us; // Tis section are temp variables used by state transition @@ -352,6 +363,7 @@ class AsyncConnection : public Connection { // used only by "read_until" uint64_t state_offset; NetHandler net; + Worker *worker; EventCenter *center; ceph::shared_ptr session_security; @@ -366,6 +378,7 @@ class AsyncConnection : public Connection { void handle_write(); void process(); void wakeup_from(uint64_t id); + void tick(uint64_t id); void local_deliver(); void stop(bool queue_reset) { lock.Lock(); @@ -376,9 +389,14 @@ class AsyncConnection : public Connection { dispatch_queue->queue_reset(this); } void cleanup_handler() { + for (auto &&t : register_time_events) + center->delete_time_event(t); + register_time_events.clear(); + center->delete_time_event(last_tick_id); delete read_handler; delete write_handler; delete wakeup_handler; + delete tick_handler; if (delay_state) { delete delay_state; delay_state = NULL; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 50d48c3ecb0d6..1a5e22aebd2fd 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -50,48 +50,6 @@ static ostream& _prefix(std::ostream *_dout, WorkerPool *p) { } -class Worker : public Thread { - static const uint64_t InitEventNumber = 5000; - static const uint64_t EventMaxWaitUs = 30000000; - CephContext *cct; - WorkerPool *pool; - bool done; - int id; - PerfCounters *perf_logger; - - public: - EventCenter center; - std::atomic_uint references; - Worker(CephContext *c, WorkerPool *p, int i) - : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) { - center.init(InitEventNumber); - char name[128]; - sprintf(name, "AsyncMessenger::Worker-%d", id); - // initialize perf_logger - PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); - - plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); - plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); - plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages"); - plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); - plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); - - perf_logger = plb.create_perf_counters(); - cct->get_perfcounters_collection()->add(perf_logger); - } - ~Worker() { - if (perf_logger) { - cct->get_perfcounters_collection()->remove(perf_logger); - delete perf_logger; - } - } - void *entry(); - void stop(); - PerfCounters *get_perf_counter() { return perf_logger; } -}; - /******************* * Processor */ @@ -357,7 +315,6 @@ class WorkerPool { virtual ~WorkerPool(); void start(); Worker *get_worker(); - void release_worker(EventCenter* c); int get_cpuid(int id) { if (coreids.empty()) return -1; @@ -495,27 +452,10 @@ Worker* WorkerPool::get_worker() return current_best; } -void WorkerPool::release_worker(EventCenter* c) -{ - ldout(cct, 10) << __func__ << dendl; - simple_spin_lock(&pool_spin); - for (auto p = workers.begin(); p != workers.end(); ++p) { - if (&((*p)->center) == c) { - ldout(cct, 10) << __func__ << " found worker, releasing" << dendl; - int oldref = (*p)->references.fetch_sub(1); - assert(oldref > 0); - break; - } - } - simple_spin_unlock(&pool_spin); -} - void WorkerPool::barrier() { ldout(cct, 10) << __func__ << " started." << dendl; - pthread_t cur = pthread_self(); for (vector::iterator it = workers.begin(); it != workers.end(); ++it) { - assert(cur != (*it)->center.get_owner()); barrier_count.inc(); (*it)->center.dispatch_event_external(EventCallbackRef(new C_barrier(this))); } @@ -545,8 +485,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, ceph_spin_init(&global_seq_lock); cct->lookup_or_create_singleton_object(pool, WorkerPool::name); local_worker = pool->get_worker(); - local_connection = new AsyncConnection( - cct, this, &dispatch_queue, &local_worker->center, local_worker->get_perf_counter()); + local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker); local_features = features; init_local_connection(); reap_handler = new C_handle_reap(this); @@ -680,7 +619,7 @@ AsyncConnectionRef AsyncMessenger::add_accept(int sd) { lock.Lock(); Worker *w = pool->get_worker(); - AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, &w->center, w->get_perf_counter()); + AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); conn->accept(sd); accepting_conns.insert(conn); lock.Unlock(); @@ -697,7 +636,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int // create connection Worker *w = pool->get_worker(); - AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, &w->center, w->get_perf_counter()); + AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); conn->connect(addr, type); assert(!conns.count(addr)); conns[addr] = conn; @@ -860,10 +799,7 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr) Connection *AsyncMessenger::create_anon_connection() { Mutex::Locker l(lock); Worker *w = pool->get_worker(); - return new AsyncConnection(cct, - this, - &dispatch_queue, - &w->center, w->get_perf_counter()); + return new AsyncConnection(cct, this, &dispatch_queue, w); } int AsyncMessenger::get_proto_version(int peer_type, bool connect) @@ -893,16 +829,6 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) return 0; } -void AsyncMessenger::unregister_conn(AsyncConnectionRef conn) { - Mutex::Locker l(deleted_lock); - conn->release_worker(); - deleted_conns.insert(conn); - - if (deleted_conns.size() >= ReapDeadConnectionThreshold) { - local_worker->center.dispatch_event_external(reap_handler); - } -} - void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) { // be careful here: multiple threads may block here, and readers of @@ -947,7 +873,3 @@ int AsyncMessenger::reap_dead() return num; } - -void AsyncMessenger::release_worker(EventCenter* c) { - pool->release_worker(c); -} diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 61f8f3b76d128..9ac8a1a5c66b8 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -56,7 +56,51 @@ enum { }; -class Worker; +class Worker : public Thread { + static const uint64_t InitEventNumber = 5000; + static const uint64_t EventMaxWaitUs = 30000000; + CephContext *cct; + WorkerPool *pool; + bool done; + int id; + PerfCounters *perf_logger; + + public: + EventCenter center; + std::atomic_uint references; + Worker(CephContext *c, WorkerPool *p, int i) + : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) { + center.init(InitEventNumber, i); + char name[128]; + sprintf(name, "AsyncMessenger::Worker-%d", id); + // initialize perf_logger + PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); + + plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); + plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); + plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages"); + plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); + plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes"); + plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); + plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); + + perf_logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(perf_logger); + } + ~Worker() { + if (perf_logger) { + cct->get_perfcounters_collection()->remove(perf_logger); + delete perf_logger; + } + } + void *entry(); + void stop(); + PerfCounters *get_perf_counter() { return perf_logger; } + void release_worker() { + int oldref = references.fetch_sub(1); + assert(oldref > 0); + } +}; /** * If the Messenger binds to a specific address, the Processor runs @@ -444,7 +488,14 @@ class AsyncMessenger : public SimplePolicyMessenger { * * See "deleted_conns" */ - void unregister_conn(AsyncConnectionRef conn); + void unregister_conn(AsyncConnectionRef conn) { + Mutex::Locker l(deleted_lock); + deleted_conns.insert(conn); + + if (deleted_conns.size() >= ReapDeadConnectionThreshold) { + local_worker->center.dispatch_event_external(reap_handler); + } + } /** * Reap dead connection from `deleted_conns` @@ -454,8 +505,6 @@ class AsyncMessenger : public SimplePolicyMessenger { * See "deleted_conns" */ int reap_dead(); - - void release_worker(EventCenter* c); /** * @} // AsyncMessenger Internals diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index eb20406774e7d..5e436d5a20c7b 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -14,8 +14,6 @@ * */ -#include - #include "common/errno.h" #include "Event.h" @@ -57,16 +55,17 @@ class C_handle_notify : public EventCallback { ostream& EventCenter::_event_prefix(std::ostream *_dout) { - return *_dout << "Event(" << this << " owner=" << get_owner() << " nevent=" << nevent + return *_dout << "Event(" << this << " nevent=" << nevent << " time_id=" << time_event_next_id << ")."; } -static thread_local pthread_t thread_id = 0; - -int EventCenter::init(int n) +int EventCenter::init(int n, unsigned i) { // can't init multi times assert(nevent == 0); + + idx = i; + #ifdef HAVE_EPOLL driver = new EpollDriver(cct); #else @@ -117,7 +116,7 @@ int EventCenter::init(int n) EventCenter::~EventCenter() { { - Mutex::Locker l(external_lock); + std::lock_guard l(external_lock); while (!external_events.empty()) { EventCallbackRef e = external_events.front(); if (e) @@ -141,13 +140,18 @@ EventCenter::~EventCenter() void EventCenter::set_owner() { - thread_id = owner = pthread_self(); + cct->lookup_or_create_singleton_object( + global_centers, "AsyncMessenger::EventCenter::global_center"); + assert(global_centers && !global_centers->centers[idx]); + global_centers->centers[idx] = this; + owner = pthread_self(); + ldout(cct, 1) << __func__ << " idx=" << idx << " owner=" << owner << dendl; } int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) { int r = 0; - Mutex::Locker l(file_lock); + std::lock_guard l(file_lock); if (fd >= nevent) { int new_size = nevent << 2; while (fd > new_size) @@ -192,7 +196,7 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) void EventCenter::delete_file_event(int fd, int mask) { assert(fd >= 0); - Mutex::Locker l(file_lock); + std::lock_guard l(file_lock); if (fd >= nevent) { ldout(cct, 1) << __func__ << " delete event fd=" << fd << " is equal or greater than nevent=" << nevent << "mask=" << mask << dendl; @@ -224,7 +228,7 @@ void EventCenter::delete_file_event(int fd, int mask) uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt) { - Mutex::Locker l(time_lock); + assert(in_thread()); uint64_t id = time_event_next_id++; ldout(cct, 10) << __func__ << " id=" << id << " trigger after " << microseconds << "us"<< dendl; @@ -232,32 +236,27 @@ uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef clock_type::time_point expire = clock_type::now() + std::chrono::microseconds(microseconds); event.id = id; event.time_cb = ctxt; - time_events[expire].push_back(event); - if (expire < next_time) - wakeup(); + std::multimap::value_type s_val(expire, event); + auto it = time_events.insert(std::move(s_val)); + event_map[id] = it; return id; } -// TODO: Ineffective implementation now! void EventCenter::delete_time_event(uint64_t id) { - Mutex::Locker l(time_lock); + assert(in_thread()); ldout(cct, 10) << __func__ << " id=" << id << dendl; if (id >= time_event_next_id) return ; - for (auto it = time_events.begin(); it != time_events.end(); ++it) { - for (list::iterator j = it->second.begin(); - j != it->second.end(); ++j) { - if (j->id == id) { - it->second.erase(j); - if (it->second.empty()) - time_events.erase(it); - return ; - } - } + auto it = event_map.find(id); + if (it == event_map.end()) { + ldout(cct, 10) << __func__ << " id=" << id << " not found" << dendl; + return ; } + + time_events.erase(it->second); } void EventCenter::wakeup() @@ -279,34 +278,17 @@ int EventCenter::process_time_events() clock_type::time_point now = clock_type::now(); ldout(cct, 10) << __func__ << " cur time is " << now << dendl; - Mutex::Locker l(time_lock); - /* If the system clock is moved to the future, and then set back to the - * right value, time events may be delayed in a random way. Often this - * means that scheduled operations will not be performed soon enough. - * - * Here we try to detect system clock skews, and force all the time - * events to be processed ASAP when this happens: the idea is that - * processing events earlier is less dangerous than delaying them - * indefinitely, and practice suggests it is. */ - bool clock_skewed = now < last_time; - last_time = now; - while (!time_events.empty()) { auto it = time_events.begin(); - if (now >= it->first || clock_skewed) { - if (it->second.empty()) { - time_events.erase(it); - } else { - TimeEvent &e = it->second.front(); - EventCallbackRef cb = e.time_cb; - uint64_t id = e.id; - it->second.pop_front(); - ldout(cct, 10) << __func__ << " process time event: id=" << id << dendl; - processed++; - time_lock.Unlock(); - cb->do_request(id); - time_lock.Lock(); - } + if (now >= it->first) { + TimeEvent &e = it->second; + EventCallbackRef cb = e.time_cb; + uint64_t id = e.id; + time_events.erase(it); + event_map.erase(id); + ldout(cct, 10) << __func__ << " process time event: id=" << id << dendl; + processed++; + cb->do_request(id); } else { break; } @@ -317,25 +299,21 @@ int EventCenter::process_time_events() int EventCenter::process_events(int timeout_microseconds) { - // Must set owner before looping - assert(owner); struct timeval tv; int numevents; bool trigger_time = false; auto now = clock_type::now(); // If exists external events, don't block - if (external_num_events.read()) { + if (external_num_events.load()) { tv.tv_sec = 0; tv.tv_usec = 0; - next_time = now; } else { clock_type::time_point shortest; shortest = now + std::chrono::microseconds(timeout_microseconds); - Mutex::Locker l(time_lock); auto it = time_events.begin(); - if (it != time_events.end() && shortest > it->first) { + if (it != time_events.end() && shortest >= it->first) { ldout(cct, 10) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl; shortest = it->first; trigger_time = true; @@ -349,13 +327,12 @@ int EventCenter::process_events(int timeout_microseconds) } tv.tv_sec = timeout_microseconds / 1000000; tv.tv_usec = timeout_microseconds % 1000000; - next_time = shortest; } ldout(cct, 10) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl; vector fired_events; numevents = driver->event_wait(fired_events, &tv); - file_lock.Lock(); + file_lock.lock(); for (int j = 0; j < numevents; j++) { int rfired = 0; FileEvent *event; @@ -370,36 +347,36 @@ int EventCenter::process_events(int timeout_microseconds) if (event->mask & fired_events[j].mask & EVENT_READABLE) { rfired = 1; cb = event->read_cb; - file_lock.Unlock(); + file_lock.unlock(); cb->do_request(fired_events[j].fd); - file_lock.Lock(); + file_lock.lock(); } if (event->mask & fired_events[j].mask & EVENT_WRITABLE) { if (!rfired || event->read_cb != event->write_cb) { cb = event->write_cb; - file_lock.Unlock(); + file_lock.unlock(); cb->do_request(fired_events[j].fd); - file_lock.Lock(); + file_lock.lock(); } } ldout(cct, 20) << __func__ << " event_wq process is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl; } - file_lock.Unlock(); + file_lock.unlock(); if (trigger_time) numevents += process_time_events(); - if (external_num_events.read()) { - external_lock.Lock(); + if (external_num_events.load()) { + external_lock.lock(); if (external_events.empty()) { - external_lock.Unlock(); + external_lock.unlock(); } else { deque cur_process; cur_process.swap(external_events); - external_num_events.set(0); - external_lock.Unlock(); + external_num_events.store(0); + external_lock.unlock(); while (!cur_process.empty()) { EventCallbackRef e = cur_process.front(); if (e) @@ -414,11 +391,11 @@ int EventCenter::process_events(int timeout_microseconds) void EventCenter::dispatch_event_external(EventCallbackRef e) { - external_lock.Lock(); + external_lock.lock(); external_events.push_back(e); - uint64_t num = external_num_events.inc(); - external_lock.Unlock(); - if (thread_id != owner) + uint64_t num = ++external_num_events; + external_lock.unlock(); + if (!in_thread()) wakeup(); ldout(cct, 10) << __func__ << " " << e << " pending " << num << dendl; diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 4618c156b0d92..7375e6d9fa501 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -37,13 +37,12 @@ #endif #endif -#include +#include +#include +#include -#include "include/atomic.h" -#include "include/Context.h" -#include "include/unordered_map.h" #include "common/ceph_time.h" -#include "common/WorkQueue.h" +#include "common/dout.h" #include "net_handler.h" #define EVENT_NONE 0 @@ -81,12 +80,22 @@ class EventDriver { virtual int resize_events(int newsize) = 0; }; - /* * EventCenter maintain a set of file descriptor and handle registered events. */ class EventCenter { + using clock_type = ceph::coarse_mono_clock; + // should be enough; + static const int MAX_EVENTCENTER = 24; + + struct AssociatedCenters { + EventCenter *centers[MAX_EVENTCENTER]; + AssociatedCenters(CephContext *c) { + memset(centers, 0, MAX_EVENTCENTER * sizeof(EventCenter*)); + } + }; + struct FileEvent { int mask; EventCallbackRef read_cb; @@ -104,20 +113,21 @@ class EventCenter { CephContext *cct; int nevent; // Used only to external event - Mutex external_lock, file_lock, time_lock; - atomic_t external_num_events; + pthread_t owner; + std::mutex external_lock, file_lock;; + std::atomic_ulong external_num_events; deque external_events; vector file_events; EventDriver *driver; - map > time_events; + std::multimap time_events; + std::map::iterator> event_map; uint64_t time_event_next_id; - clock_type::time_point last_time; // last time process time event - clock_type::time_point next_time; // next wake up time int notify_receive_fd; int notify_send_fd; NetHandler net; - pthread_t owner; EventCallbackRef notify_handler; + unsigned idx = 10000; + AssociatedCenters *global_centers; int process_time_events(); FileEvent *_get_file_event(int fd) { @@ -130,22 +140,18 @@ class EventCenter { explicit EventCenter(CephContext *c): cct(c), nevent(0), - external_lock("AsyncMessenger::external_lock"), - file_lock("AsyncMessenger::file_lock"), - time_lock("AsyncMessenger::time_lock"), external_num_events(0), driver(NULL), time_event_next_id(1), - notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0), + notify_receive_fd(-1), notify_send_fd(-1), net(c), notify_handler(NULL), already_wakeup(0) { - last_time = clock_type::now(); } ~EventCenter(); ostream& _event_prefix(std::ostream *_dout); - int init(int nevent); + int init(int nevent, unsigned idx); void set_owner(); - pthread_t get_owner() { return owner; } + unsigned get_id() { return idx; } // Used by internal thread int create_file_event(int fd, int mask, EventCallbackRef ctxt); @@ -157,6 +163,57 @@ class EventCenter { // Used by external thread void dispatch_event_external(EventCallbackRef e); + inline bool in_thread() const { + return pthread_equal(pthread_self(), owner); + } + + private: + template + class C_submit_event : public EventCallback { + std::mutex lock; + std::condition_variable cond; + bool done = false; + func f; + bool nonwait; + public: + C_submit_event(func &&_f, bool nw) + : f(std::move(_f)), nonwait(nw) {} + void do_request(int id) { + f(); + lock.lock(); + cond.notify_all(); + done = true; + lock.unlock(); + if (nonwait) + delete this; + } + void wait() { + assert(!nonwait); + std::unique_lock l(lock); + while (!done) + cond.wait(l); + } + }; + + public: + template + void submit_to(int i, func &&f, bool nowait = false) { + assert(i < MAX_EVENTCENTER && global_centers); + EventCenter *c = global_centers->centers[i]; + assert(c); + if (!nowait && c->in_thread()) { + f(); + return ; + } + if (nowait) { + C_submit_event *event = new C_submit_event(std::move(f), true); + c->dispatch_event_external(event); + } else { + C_submit_event event(std::move(f), false); + c->dispatch_event_external(&event); + event.wait(); + } + }; }; #endif diff --git a/src/test/msgr/test_async_driver.cc b/src/test/msgr/test_async_driver.cc index 59ac8bf6a3396..adcbb99160e8f 100644 --- a/src/test/msgr/test_async_driver.cc +++ b/src/test/msgr/test_async_driver.cc @@ -25,6 +25,8 @@ #include #include "include/Context.h" #include "include/atomic.h" +#include "common/Mutex.h" +#include "common/Cond.h" #include "global/global_init.h" #include "common/ceph_argparse.h" #include "msg/async/Event.h" @@ -253,7 +255,7 @@ class FakeEvent : public EventCallback { TEST(EventCenterTest, FileEventExpansion) { vector sds; EventCenter center(g_ceph_context); - center.init(100); + center.init(100, 0); EventCallbackRef e(new FakeEvent()); for (int i = 0; i < 300; i++) { int sd = ::socket(AF_INET, SOCK_STREAM, 0); @@ -272,8 +274,8 @@ class Worker : public Thread { public: EventCenter center; - explicit Worker(CephContext *c): cct(c), done(false), center(c) { - center.init(100); + explicit Worker(CephContext *c, int idx): cct(c), done(false), center(c) { + center.init(100, idx); } void stop() { done = true; @@ -303,7 +305,7 @@ class CountEvent: public EventCallback { }; TEST(EventCenterTest, DispatchTest) { - Worker worker1(g_ceph_context), worker2(g_ceph_context); + Worker worker1(g_ceph_context, 0), worker2(g_ceph_context, 1); atomic_t count(0); Mutex lock("DispatchTest::lock"); Cond cond; diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 76b975af118a9..d7f30756733f9 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -369,6 +369,44 @@ TEST_P(MessengerTest, FeatureTest) { client_msgr->wait(); } +TEST_P(MessengerTest, TimeoutTest) { + g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "1"); + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("127.0.0.1"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // 1. build the connection + MPing *m = new MPing(); + ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); + { + ASSERT_EQ(conn->send_message(m), 0); + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; + } + ASSERT_TRUE(conn->is_connected()); + ASSERT_TRUE((static_cast(conn->get_priv()))->get_count() == 1); + ASSERT_TRUE(conn->peer_is_osd()); + + // 2. wait for idle + usleep(2500*1000); + ASSERT_FALSE(conn->is_connected()); + + server_msgr->shutdown(); + server_msgr->wait(); + + client_msgr->shutdown(); + client_msgr->wait(); + g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "900"); +} + TEST_P(MessengerTest, StatefulTest) { Message *m; FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); diff --git a/src/test/perf_local.cc b/src/test/perf_local.cc index 7c33dc5d45462..7320dcd6d53a0 100644 --- a/src/test/perf_local.cc +++ b/src/test/perf_local.cc @@ -450,7 +450,7 @@ double eventcenter_poll() { int count = 1000000; EventCenter center(g_ceph_context); - center.init(1000); + center.init(1000, 0); center.set_owner(); uint64_t start = Cycles::rdtsc(); for (int i = 0; i < count; i++) { @@ -467,7 +467,7 @@ class CenterWorker : public Thread { public: EventCenter center; explicit CenterWorker(CephContext *c): cct(c), done(false), center(c) { - center.init(100); + center.init(100, 0); } void stop() { done = true;