From 8f0cd8e2a817c6bcec0e11e469463812cf451fc1 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 24 May 2016 15:23:50 +0800 Subject: [PATCH 01/28] Event: use multimap instead of map Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 35 +++++++++++++---------------------- src/msg/async/Event.h | 2 +- 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index eb20406774e7d..ff535c7b81965 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -232,7 +232,7 @@ uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef clock_type::time_point expire = clock_type::now() + std::chrono::microseconds(microseconds); event.id = id; event.time_cb = ctxt; - time_events[expire].push_back(event); + time_events[expire] = event; if (expire < next_time) wakeup(); @@ -248,14 +248,9 @@ void EventCenter::delete_time_event(uint64_t id) return ; for (auto it = time_events.begin(); it != time_events.end(); ++it) { - for (list::iterator j = it->second.begin(); - j != it->second.end(); ++j) { - if (j->id == id) { - it->second.erase(j); - if (it->second.empty()) - time_events.erase(it); - return ; - } + if (it->second.id == id) { + time_events.erase(it); + return ; } } } @@ -294,19 +289,15 @@ int EventCenter::process_time_events() while (!time_events.empty()) { auto it = time_events.begin(); if (now >= it->first || clock_skewed) { - if (it->second.empty()) { - time_events.erase(it); - } else { - TimeEvent &e = it->second.front(); - EventCallbackRef cb = e.time_cb; - uint64_t id = e.id; - it->second.pop_front(); - ldout(cct, 10) << __func__ << " process time event: id=" << id << dendl; - processed++; - time_lock.Unlock(); - cb->do_request(id); - time_lock.Lock(); - } + TimeEvent &e = it->second; + EventCallbackRef cb = e.time_cb; + uint64_t id = e.id; + time_events.erase(it); + ldout(cct, 10) << __func__ << " process time event: id=" << id << dendl; + processed++; + time_lock.Unlock(); + cb->do_request(id); + time_lock.Lock(); } else { break; } diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 4618c156b0d92..14c3ab429cce2 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -109,7 +109,7 @@ class EventCenter { deque external_events; vector file_events; EventDriver *driver; - map > time_events; + multimap time_events; uint64_t time_event_next_id; clock_type::time_point last_time; // last time process time event clock_type::time_point next_time; // next wake up time From d493b6bf62b0d14db55cd84b7fcc583e59fd1a9f Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 24 May 2016 15:24:50 +0800 Subject: [PATCH 02/28] Event: remove clock skew detect since we use mono clock now Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 13 +------------ src/msg/async/Event.h | 2 -- 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index ff535c7b81965..8d973a2fcf0c5 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -275,20 +275,9 @@ int EventCenter::process_time_events() ldout(cct, 10) << __func__ << " cur time is " << now << dendl; Mutex::Locker l(time_lock); - /* If the system clock is moved to the future, and then set back to the - * right value, time events may be delayed in a random way. Often this - * means that scheduled operations will not be performed soon enough. - * - * Here we try to detect system clock skews, and force all the time - * events to be processed ASAP when this happens: the idea is that - * processing events earlier is less dangerous than delaying them - * indefinitely, and practice suggests it is. */ - bool clock_skewed = now < last_time; - last_time = now; - while (!time_events.empty()) { auto it = time_events.begin(); - if (now >= it->first || clock_skewed) { + if (now >= it->first) { TimeEvent &e = it->second; EventCallbackRef cb = e.time_cb; uint64_t id = e.id; diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 14c3ab429cce2..7a72179dfa1d6 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -111,7 +111,6 @@ class EventCenter { EventDriver *driver; multimap time_events; uint64_t time_event_next_id; - clock_type::time_point last_time; // last time process time event clock_type::time_point next_time; // next wake up time int notify_receive_fd; int notify_send_fd; @@ -138,7 +137,6 @@ class EventCenter { notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0), notify_handler(NULL), already_wakeup(0) { - last_time = clock_type::now(); } ~EventCenter(); ostream& _event_prefix(std::ostream *_dout); From 7ca8839f97c059d91a476658e3ffbedb2a7b5424 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 24 May 2016 15:34:29 +0800 Subject: [PATCH 03/28] Event: use local_center instead of pthread id Signed-off-by: Haomai Wang --- src/msg/async/AsyncMessenger.cc | 2 -- src/msg/async/Event.cc | 10 ++++------ src/msg/async/Event.h | 14 +++++++++----- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 50d48c3ecb0d6..8c4ff3a7f862c 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -513,9 +513,7 @@ void WorkerPool::release_worker(EventCenter* c) void WorkerPool::barrier() { ldout(cct, 10) << __func__ << " started." << dendl; - pthread_t cur = pthread_self(); for (vector::iterator it = workers.begin(); it != workers.end(); ++it) { - assert(cur != (*it)->center.get_owner()); barrier_count.inc(); (*it)->center.dispatch_event_external(EventCallbackRef(new C_barrier(this))); } diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 8d973a2fcf0c5..e17db8aebb4de 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -57,11 +57,11 @@ class C_handle_notify : public EventCallback { ostream& EventCenter::_event_prefix(std::ostream *_dout) { - return *_dout << "Event(" << this << " owner=" << get_owner() << " nevent=" << nevent + return *_dout << "Event(" << this << " nevent=" << nevent << " time_id=" << time_event_next_id << ")."; } -static thread_local pthread_t thread_id = 0; +thread_local EventCenter* local_center = nullptr; int EventCenter::init(int n) { @@ -141,7 +141,7 @@ EventCenter::~EventCenter() void EventCenter::set_owner() { - thread_id = owner = pthread_self(); + local_center = this; } int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) @@ -297,8 +297,6 @@ int EventCenter::process_time_events() int EventCenter::process_events(int timeout_microseconds) { - // Must set owner before looping - assert(owner); struct timeval tv; int numevents; bool trigger_time = false; @@ -398,7 +396,7 @@ void EventCenter::dispatch_event_external(EventCallbackRef e) external_events.push_back(e); uint64_t num = external_num_events.inc(); external_lock.Unlock(); - if (thread_id != owner) + if (!in_thread()) wakeup(); ldout(cct, 10) << __func__ << " " << e << " pending " << num << dendl; diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 7a72179dfa1d6..b5fe2171f6e72 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -37,8 +37,6 @@ #endif #endif -#include - #include "include/atomic.h" #include "include/Context.h" #include "include/unordered_map.h" @@ -81,6 +79,11 @@ class EventDriver { virtual int resize_events(int newsize) = 0; }; +extern thread_local EventCenter* local_center; + +inline EventCenter* center() { + return local_center; +} /* * EventCenter maintain a set of file descriptor and handle registered events. @@ -115,7 +118,6 @@ class EventCenter { int notify_receive_fd; int notify_send_fd; NetHandler net; - pthread_t owner; EventCallbackRef notify_handler; int process_time_events(); @@ -134,7 +136,7 @@ class EventCenter { time_lock("AsyncMessenger::time_lock"), external_num_events(0), driver(NULL), time_event_next_id(1), - notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0), + notify_receive_fd(-1), notify_send_fd(-1), net(c), notify_handler(NULL), already_wakeup(0) { } @@ -143,7 +145,6 @@ class EventCenter { int init(int nevent); void set_owner(); - pthread_t get_owner() { return owner; } // Used by internal thread int create_file_event(int fd, int mask, EventCallbackRef ctxt); @@ -155,6 +156,9 @@ class EventCenter { // Used by external thread void dispatch_event_external(EventCallbackRef e); + inline bool in_thread() const { + return local_center == this; + } }; #endif From ba470cea09bcc9b57e7647dc0a24c80bf0e54a5e Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 8 Mar 2016 14:31:19 +0800 Subject: [PATCH 04/28] Event: remove time_lock which is no need Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 10 +++------- src/msg/async/Event.h | 3 +-- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index e17db8aebb4de..5bad77605f51b 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -224,7 +224,7 @@ void EventCenter::delete_file_event(int fd, int mask) uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt) { - Mutex::Locker l(time_lock); + assert(in_thread()); uint64_t id = time_event_next_id++; ldout(cct, 10) << __func__ << " id=" << id << " trigger after " << microseconds << "us"<< dendl; @@ -242,7 +242,7 @@ uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef // TODO: Ineffective implementation now! void EventCenter::delete_time_event(uint64_t id) { - Mutex::Locker l(time_lock); + assert(in_thread()); ldout(cct, 10) << __func__ << " id=" << id << dendl; if (id >= time_event_next_id) return ; @@ -274,7 +274,6 @@ int EventCenter::process_time_events() clock_type::time_point now = clock_type::now(); ldout(cct, 10) << __func__ << " cur time is " << now << dendl; - Mutex::Locker l(time_lock); while (!time_events.empty()) { auto it = time_events.begin(); if (now >= it->first) { @@ -284,9 +283,7 @@ int EventCenter::process_time_events() time_events.erase(it); ldout(cct, 10) << __func__ << " process time event: id=" << id << dendl; processed++; - time_lock.Unlock(); cb->do_request(id); - time_lock.Lock(); } else { break; } @@ -311,9 +308,8 @@ int EventCenter::process_events(int timeout_microseconds) clock_type::time_point shortest; shortest = now + std::chrono::microseconds(timeout_microseconds); - Mutex::Locker l(time_lock); auto it = time_events.begin(); - if (it != time_events.end() && shortest > it->first) { + if (it != time_events.end() && shortest >= it->first) { ldout(cct, 10) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl; shortest = it->first; trigger_time = true; diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index b5fe2171f6e72..9cf1a920deeb1 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -107,7 +107,7 @@ class EventCenter { CephContext *cct; int nevent; // Used only to external event - Mutex external_lock, file_lock, time_lock; + Mutex external_lock, file_lock; atomic_t external_num_events; deque external_events; vector file_events; @@ -133,7 +133,6 @@ class EventCenter { cct(c), nevent(0), external_lock("AsyncMessenger::external_lock"), file_lock("AsyncMessenger::file_lock"), - time_lock("AsyncMessenger::time_lock"), external_num_events(0), driver(NULL), time_event_next_id(1), notify_receive_fd(-1), notify_send_fd(-1), net(c), From e515c9dfea29c1ca7e2dd575486e36322c22973e Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 8 Mar 2016 14:33:45 +0800 Subject: [PATCH 05/28] Event: replace atomic_t with c++ atomic Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 8 ++++---- src/msg/async/Event.h | 7 +++++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 5bad77605f51b..df96343451f83 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -300,7 +300,7 @@ int EventCenter::process_events(int timeout_microseconds) auto now = clock_type::now(); // If exists external events, don't block - if (external_num_events.read()) { + if (external_num_events.load()) { tv.tv_sec = 0; tv.tv_usec = 0; next_time = now; @@ -365,14 +365,14 @@ int EventCenter::process_events(int timeout_microseconds) if (trigger_time) numevents += process_time_events(); - if (external_num_events.read()) { + if (external_num_events.load()) { external_lock.Lock(); if (external_events.empty()) { external_lock.Unlock(); } else { deque cur_process; cur_process.swap(external_events); - external_num_events.set(0); + external_num_events.store(0); external_lock.Unlock(); while (!cur_process.empty()) { EventCallbackRef e = cur_process.front(); @@ -390,7 +390,7 @@ void EventCenter::dispatch_event_external(EventCallbackRef e) { external_lock.Lock(); external_events.push_back(e); - uint64_t num = external_num_events.inc(); + uint64_t num = ++external_num_events; external_lock.Unlock(); if (!in_thread()) wakeup(); diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 9cf1a920deeb1..356c74ef85121 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -37,7 +37,10 @@ #endif #endif -#include "include/atomic.h" +#include +#include +#include + #include "include/Context.h" #include "include/unordered_map.h" #include "common/ceph_time.h" @@ -108,7 +111,7 @@ class EventCenter { int nevent; // Used only to external event Mutex external_lock, file_lock; - atomic_t external_num_events; + std::atomic_ulong external_num_events; deque external_events; vector file_events; EventDriver *driver; From eebc6c2b8208c7c11b2e8d4990ba8f61332c1cf4 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 8 Mar 2016 14:35:17 +0800 Subject: [PATCH 06/28] Event: replace Mutex with c++ mutex Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 28 ++++++++++++++-------------- src/msg/async/Event.h | 8 +++----- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index df96343451f83..0a6afb4654cf6 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -117,7 +117,7 @@ int EventCenter::init(int n) EventCenter::~EventCenter() { { - Mutex::Locker l(external_lock); + std::lock_guard l(external_lock); while (!external_events.empty()) { EventCallbackRef e = external_events.front(); if (e) @@ -147,7 +147,7 @@ void EventCenter::set_owner() int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) { int r = 0; - Mutex::Locker l(file_lock); + std::lock_guard l(file_lock); if (fd >= nevent) { int new_size = nevent << 2; while (fd > new_size) @@ -192,7 +192,7 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) void EventCenter::delete_file_event(int fd, int mask) { assert(fd >= 0); - Mutex::Locker l(file_lock); + std::lock_guard l(file_lock); if (fd >= nevent) { ldout(cct, 1) << __func__ << " delete event fd=" << fd << " is equal or greater than nevent=" << nevent << "mask=" << mask << dendl; @@ -329,7 +329,7 @@ int EventCenter::process_events(int timeout_microseconds) ldout(cct, 10) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl; vector fired_events; numevents = driver->event_wait(fired_events, &tv); - file_lock.Lock(); + file_lock.lock(); for (int j = 0; j < numevents; j++) { int rfired = 0; FileEvent *event; @@ -344,36 +344,36 @@ int EventCenter::process_events(int timeout_microseconds) if (event->mask & fired_events[j].mask & EVENT_READABLE) { rfired = 1; cb = event->read_cb; - file_lock.Unlock(); + file_lock.unlock(); cb->do_request(fired_events[j].fd); - file_lock.Lock(); + file_lock.lock(); } if (event->mask & fired_events[j].mask & EVENT_WRITABLE) { if (!rfired || event->read_cb != event->write_cb) { cb = event->write_cb; - file_lock.Unlock(); + file_lock.unlock(); cb->do_request(fired_events[j].fd); - file_lock.Lock(); + file_lock.lock(); } } ldout(cct, 20) << __func__ << " event_wq process is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl; } - file_lock.Unlock(); + file_lock.unlock(); if (trigger_time) numevents += process_time_events(); if (external_num_events.load()) { - external_lock.Lock(); + external_lock.lock(); if (external_events.empty()) { - external_lock.Unlock(); + external_lock.unlock(); } else { deque cur_process; cur_process.swap(external_events); external_num_events.store(0); - external_lock.Unlock(); + external_lock.unlock(); while (!cur_process.empty()) { EventCallbackRef e = cur_process.front(); if (e) @@ -388,10 +388,10 @@ int EventCenter::process_events(int timeout_microseconds) void EventCenter::dispatch_event_external(EventCallbackRef e) { - external_lock.Lock(); + external_lock.lock(); external_events.push_back(e); uint64_t num = ++external_num_events; - external_lock.Unlock(); + external_lock.unlock(); if (!in_thread()) wakeup(); diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 356c74ef85121..344f322e18e4f 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -41,10 +41,10 @@ #include #include -#include "include/Context.h" +#include "include/utime.h" #include "include/unordered_map.h" #include "common/ceph_time.h" -#include "common/WorkQueue.h" +#include "common/dout.h" #include "net_handler.h" #define EVENT_NONE 0 @@ -110,7 +110,7 @@ class EventCenter { CephContext *cct; int nevent; // Used only to external event - Mutex external_lock, file_lock; + std::mutex external_lock, file_lock;; std::atomic_ulong external_num_events; deque external_events; vector file_events; @@ -134,8 +134,6 @@ class EventCenter { explicit EventCenter(CephContext *c): cct(c), nevent(0), - external_lock("AsyncMessenger::external_lock"), - file_lock("AsyncMessenger::file_lock"), external_num_events(0), driver(NULL), time_event_next_id(1), notify_receive_fd(-1), notify_send_fd(-1), net(c), From 018b48c1c6c98db290b21b28a274556f80a16fb3 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 24 May 2016 15:52:44 +0800 Subject: [PATCH 07/28] Event: remove extra header files Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 2 -- src/msg/async/Event.h | 2 -- 2 files changed, 4 deletions(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 0a6afb4654cf6..e6d7fcdebd697 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -14,8 +14,6 @@ * */ -#include - #include "common/errno.h" #include "Event.h" diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 344f322e18e4f..d51b6bc13e7fa 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -41,8 +41,6 @@ #include #include -#include "include/utime.h" -#include "include/unordered_map.h" #include "common/ceph_time.h" #include "common/dout.h" #include "net_handler.h" From b58f6df23da8b4ba84b70def4f61ac99e20c0aac Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 24 May 2016 16:24:12 +0800 Subject: [PATCH 08/28] Event: remove next_time which isn't needed Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 4 ---- src/msg/async/Event.h | 1 - 2 files changed, 5 deletions(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index e6d7fcdebd697..9549c3ebc4f93 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -231,8 +231,6 @@ uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef event.id = id; event.time_cb = ctxt; time_events[expire] = event; - if (expire < next_time) - wakeup(); return id; } @@ -301,7 +299,6 @@ int EventCenter::process_events(int timeout_microseconds) if (external_num_events.load()) { tv.tv_sec = 0; tv.tv_usec = 0; - next_time = now; } else { clock_type::time_point shortest; shortest = now + std::chrono::microseconds(timeout_microseconds); @@ -321,7 +318,6 @@ int EventCenter::process_events(int timeout_microseconds) } tv.tv_sec = timeout_microseconds / 1000000; tv.tv_usec = timeout_microseconds % 1000000; - next_time = shortest; } ldout(cct, 10) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl; diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index d51b6bc13e7fa..09a9b781c945f 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -115,7 +115,6 @@ class EventCenter { EventDriver *driver; multimap time_events; uint64_t time_event_next_id; - clock_type::time_point next_time; // next wake up time int notify_receive_fd; int notify_send_fd; NetHandler net; From 6e24d9a334bbfbb1c16cd1da78b364114e9bb854 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 24 May 2016 16:34:19 +0800 Subject: [PATCH 09/28] Event: implement effective delete_time_event Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 17 ++++++++++------- src/msg/async/Event.h | 3 ++- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 9549c3ebc4f93..eb67b0bd1c1c2 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -230,12 +230,13 @@ uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef clock_type::time_point expire = clock_type::now() + std::chrono::microseconds(microseconds); event.id = id; event.time_cb = ctxt; - time_events[expire] = event; + std::multimap::value_type s_val(expire, event); + auto it = time_events.insert(std::move(s_val)); + event_map[id] = it; return id; } -// TODO: Ineffective implementation now! void EventCenter::delete_time_event(uint64_t id) { assert(in_thread()); @@ -243,12 +244,13 @@ void EventCenter::delete_time_event(uint64_t id) if (id >= time_event_next_id) return ; - for (auto it = time_events.begin(); it != time_events.end(); ++it) { - if (it->second.id == id) { - time_events.erase(it); - return ; - } + auto it = event_map.find(id); + if (it == event_map.end()) { + ldout(cct, 10) << __func__ << " id=" << id << " not found" << dendl; + return ; } + + time_events.erase(it->second); } void EventCenter::wakeup() @@ -277,6 +279,7 @@ int EventCenter::process_time_events() EventCallbackRef cb = e.time_cb; uint64_t id = e.id; time_events.erase(it); + event_map.erase(id); ldout(cct, 10) << __func__ << " process time event: id=" << id << dendl; processed++; cb->do_request(id); diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 09a9b781c945f..a44f793fec837 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -113,7 +113,8 @@ class EventCenter { deque external_events; vector file_events; EventDriver *driver; - multimap time_events; + std::multimap time_events; + std::map::iterator> event_map; uint64_t time_event_next_id; int notify_receive_fd; int notify_send_fd; From fe6564ce62be3d5762dc5c638b62e3517101cd76 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 24 May 2016 17:09:07 +0800 Subject: [PATCH 10/28] AsyncConnection: make connection timeout when idle a lot Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 28 +++++++++++++++++++++++++++- src/msg/async/AsyncConnection.h | 5 +++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 00f7d42912efd..918d36b70d909 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -87,6 +87,16 @@ class C_clean_handler : public EventCallback { } }; +class C_tick_wakeup : public EventCallback { + AsyncConnectionRef conn; + + public: + explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {} + void do_request(int fd_or_id) { + conn->tick(); + } +}; + static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) { // create a buffer to read into that matches the data alignment @@ -116,12 +126,14 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE), open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL), recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), - recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false), + recv_start(0), recv_end(0), last_active(ceph::coarse_mono_clock::now()), + got_bad_auth(false), authorizer(NULL), replacing(false), is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c) { read_handler = new C_handle_read(this); write_handler = new C_handle_write(this); wakeup_handler = new C_time_wakeup(this); + tick_handler = new C_tick_wakeup(this); memset(msgvec, 0, sizeof(msgvec)); // double recv_max_prefetch see "read_until" recv_buf = new char[2*recv_max_prefetch]; @@ -459,6 +471,7 @@ void AsyncConnection::process() int prev_state = state; bool already_dispatch_writer = false; Mutex::Locker l(lock); + last_active = ceph::coarse_mono_clock::now(); do { ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl; prev_state = state; @@ -2587,3 +2600,16 @@ void AsyncConnection::wakeup_from(uint64_t id) lock.Unlock(); process(); } + +void AsyncConnection::tick() +{ + Mutex::Locker l(lock); + auto now = ceph::coarse_mono_clock::now(); + auto idle_period = std::chrono::duration_cast(now - last_active).count(); + if (async_msgr->cct->_conf->ms_tcp_read_timeout > (uint64_t)idle_period) { + ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than " + << async_msgr->cct->_conf->ms_tcp_read_timeout + << ", mark self fault." << dendl; + fault(); + } +} diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index af1747951c0c2..f583face7cd31 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -26,6 +26,7 @@ using namespace std; #include "auth/AuthSessionHandler.h" +#include "common/ceph_time.h" #include "common/Mutex.h" #include "common/perf_counters.h" #include "include/buffer.h" @@ -312,12 +313,14 @@ class AsyncConnection : public Connection { EventCallbackRef read_handler; EventCallbackRef write_handler; EventCallbackRef wakeup_handler; + EventCallbackRef tick_handler; struct iovec msgvec[ASYNC_IOV_MAX]; char *recv_buf; uint32_t recv_max_prefetch; uint32_t recv_start; uint32_t recv_end; set register_time_events; // need to delete it if stop + ceph::coarse_mono_clock::time_point last_active; // Tis section are temp variables used by state transition @@ -366,6 +369,7 @@ class AsyncConnection : public Connection { void handle_write(); void process(); void wakeup_from(uint64_t id); + void tick(); void local_deliver(); void stop(bool queue_reset) { lock.Lock(); @@ -379,6 +383,7 @@ class AsyncConnection : public Connection { delete read_handler; delete write_handler; delete wakeup_handler; + delete tick_handler; if (delay_state) { delete delay_state; delay_state = NULL; From dd9c43a44018ba8309e8e81af13840d8376f59fa Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 24 May 2016 17:10:15 +0800 Subject: [PATCH 11/28] test_async_driver: add missing headers Signed-off-by: Haomai Wang --- src/test/msgr/test_async_driver.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/test/msgr/test_async_driver.cc b/src/test/msgr/test_async_driver.cc index 59ac8bf6a3396..589d5a6cc4450 100644 --- a/src/test/msgr/test_async_driver.cc +++ b/src/test/msgr/test_async_driver.cc @@ -25,6 +25,8 @@ #include #include "include/Context.h" #include "include/atomic.h" +#include "common/Mutex.h" +#include "common/Cond.h" #include "global/global_init.h" #include "common/ceph_argparse.h" #include "msg/async/Event.h" From fa4767cc76357fac12c1dd46a112790b302d1177 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 25 May 2016 01:22:47 +0800 Subject: [PATCH 12/28] AsyncConnection: add tick timer Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 32 ++++++++++++++++++++++++-------- src/msg/async/AsyncConnection.h | 4 +++- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 918d36b70d909..366bdc6f4df52 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -93,7 +93,7 @@ class C_tick_wakeup : public EventCallback { public: explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {} void do_request(int fd_or_id) { - conn->tick(); + conn->tick(fd_or_id); } }; @@ -126,7 +126,9 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE), open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL), recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), - recv_start(0), recv_end(0), last_active(ceph::coarse_mono_clock::now()), + recv_start(0), recv_end(0), + last_active(ceph::coarse_mono_clock::now()), + inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000), got_bad_auth(false), authorizer(NULL), replacing(false), is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c) { @@ -1304,6 +1306,10 @@ ssize_t AsyncConnection::_process_connection() dispatch_queue->queue_connect(this); async_msgr->ms_deliver_handle_fast_connect(this); + // make sure no pending tick timer + center->delete_time_event(last_tick_id); + last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); + // message may in queue between last _try_send and connection ready // write event may already notify and we need to force scheduler again write_lock.Lock(); @@ -1471,6 +1477,11 @@ ssize_t AsyncConnection::_process_connection() ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl; state = STATE_OPEN; memset(&connect_msg, 0, sizeof(connect_msg)); + + // make sure no pending tick timer + center->delete_time_event(last_tick_id); + last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); + write_lock.Lock(); can_write = WriteStatus::CANWRITE; if (is_queued()) @@ -2601,15 +2612,20 @@ void AsyncConnection::wakeup_from(uint64_t id) process(); } -void AsyncConnection::tick() +void AsyncConnection::tick(uint64_t id) { - Mutex::Locker l(lock); auto now = ceph::coarse_mono_clock::now(); - auto idle_period = std::chrono::duration_cast(now - last_active).count(); - if (async_msgr->cct->_conf->ms_tcp_read_timeout > (uint64_t)idle_period) { + ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id + << " last_active" << last_active << dendl; + assert(last_tick_id == id); + Mutex::Locker l(lock); + auto idle_period = std::chrono::duration_cast(now - last_active).count(); + if (inactive_timeout_us < (uint64_t)idle_period) { ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than " - << async_msgr->cct->_conf->ms_tcp_read_timeout - << ", mark self fault." << dendl; + << inactive_timeout_us + << " us, mark self fault." << dendl; fault(); + } else if (is_connected()) { + last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); } } diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index f583face7cd31..f9867a572f336 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -321,6 +321,8 @@ class AsyncConnection : public Connection { uint32_t recv_end; set register_time_events; // need to delete it if stop ceph::coarse_mono_clock::time_point last_active; + uint64_t last_tick_id = 0; + const uint64_t inactive_timeout_us; // Tis section are temp variables used by state transition @@ -369,7 +371,7 @@ class AsyncConnection : public Connection { void handle_write(); void process(); void wakeup_from(uint64_t id); - void tick(); + void tick(uint64_t id); void local_deliver(); void stop(bool queue_reset) { lock.Lock(); From 9d952ad82032cff73e7b2ba3245747fbc9e64169 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 25 May 2016 01:24:04 +0800 Subject: [PATCH 13/28] test_msgr: add connection read timeout tests Signed-off-by: Haomai Wang --- src/test/msgr/test_msgr.cc | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 76b975af118a9..d7f30756733f9 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -369,6 +369,44 @@ TEST_P(MessengerTest, FeatureTest) { client_msgr->wait(); } +TEST_P(MessengerTest, TimeoutTest) { + g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "1"); + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("127.0.0.1"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // 1. build the connection + MPing *m = new MPing(); + ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); + { + ASSERT_EQ(conn->send_message(m), 0); + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_new) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_new = false; + } + ASSERT_TRUE(conn->is_connected()); + ASSERT_TRUE((static_cast(conn->get_priv()))->get_count() == 1); + ASSERT_TRUE(conn->peer_is_osd()); + + // 2. wait for idle + usleep(2500*1000); + ASSERT_FALSE(conn->is_connected()); + + server_msgr->shutdown(); + server_msgr->wait(); + + client_msgr->shutdown(); + client_msgr->wait(); + g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "900"); +} + TEST_P(MessengerTest, StatefulTest) { Message *m; FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); From 761f1471594eca73d030645d75ae6f764e340430 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 25 May 2016 09:36:35 +0800 Subject: [PATCH 14/28] AsyncConnection: move delete timers to cleanup handler Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 3 --- src/msg/async/AsyncConnection.h | 4 ++++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 366bdc6f4df52..000b6bfeeb28d 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -2236,9 +2236,6 @@ void AsyncConnection::_stop() ::close(sd); } sd = -1; - for (set::iterator it = register_time_events.begin(); - it != register_time_events.end(); ++it) - center->delete_time_event(*it); // Make sure in-queue events will been processed center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this))); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index f9867a572f336..aac571a049762 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -382,6 +382,10 @@ class AsyncConnection : public Connection { dispatch_queue->queue_reset(this); } void cleanup_handler() { + for (auto &&t : register_time_events) + center->delete_time_event(t); + register_time_events.clear(); + center->delete_time_event(last_tick_id); delete read_handler; delete write_handler; delete wakeup_handler; From 4dc2e1367bdc21fcd02526cd5a713f96cbd7a77c Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 25 May 2016 09:42:14 +0800 Subject: [PATCH 15/28] AsyncConnection: add EventCenter id Signed-off-by: Haomai Wang --- src/msg/async/AsyncMessenger.cc | 2 +- src/msg/async/Event.cc | 5 ++++- src/msg/async/Event.h | 4 +++- src/test/msgr/test_async_driver.cc | 8 ++++---- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 8c4ff3a7f862c..1dcd00dcd3dfe 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -64,7 +64,7 @@ class Worker : public Thread { std::atomic_uint references; Worker(CephContext *c, WorkerPool *p, int i) : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) { - center.init(InitEventNumber); + center.init(InitEventNumber, i); char name[128]; sprintf(name, "AsyncMessenger::Worker-%d", id); // initialize perf_logger diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index eb67b0bd1c1c2..9cd1bc051df85 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -61,10 +61,13 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout) thread_local EventCenter* local_center = nullptr; -int EventCenter::init(int n) +int EventCenter::init(int n, unsigned i) { // can't init multi times assert(nevent == 0); + + id = i; + #ifdef HAVE_EPOLL driver = new EpollDriver(cct); #else diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index a44f793fec837..353812e3d5dec 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -120,6 +120,7 @@ class EventCenter { int notify_send_fd; NetHandler net; EventCallbackRef notify_handler; + unsigned id = 10000; int process_time_events(); FileEvent *_get_file_event(int fd) { @@ -141,8 +142,9 @@ class EventCenter { ~EventCenter(); ostream& _event_prefix(std::ostream *_dout); - int init(int nevent); + int init(int nevent, unsigned id); void set_owner(); + unsigned get_id() { return id; } // Used by internal thread int create_file_event(int fd, int mask, EventCallbackRef ctxt); diff --git a/src/test/msgr/test_async_driver.cc b/src/test/msgr/test_async_driver.cc index 589d5a6cc4450..adcbb99160e8f 100644 --- a/src/test/msgr/test_async_driver.cc +++ b/src/test/msgr/test_async_driver.cc @@ -255,7 +255,7 @@ class FakeEvent : public EventCallback { TEST(EventCenterTest, FileEventExpansion) { vector sds; EventCenter center(g_ceph_context); - center.init(100); + center.init(100, 0); EventCallbackRef e(new FakeEvent()); for (int i = 0; i < 300; i++) { int sd = ::socket(AF_INET, SOCK_STREAM, 0); @@ -274,8 +274,8 @@ class Worker : public Thread { public: EventCenter center; - explicit Worker(CephContext *c): cct(c), done(false), center(c) { - center.init(100); + explicit Worker(CephContext *c, int idx): cct(c), done(false), center(c) { + center.init(100, idx); } void stop() { done = true; @@ -305,7 +305,7 @@ class CountEvent: public EventCallback { }; TEST(EventCenterTest, DispatchTest) { - Worker worker1(g_ceph_context), worker2(g_ceph_context); + Worker worker1(g_ceph_context, 0), worker2(g_ceph_context, 1); atomic_t count(0); Mutex lock("DispatchTest::lock"); Cond cond; From c33753e97e1e7bc86408b6b69812f4a620cd2fcb Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 25 May 2016 09:45:52 +0800 Subject: [PATCH 16/28] Event: add submit_to apis Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 3 ++- src/msg/async/Event.h | 51 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 9cd1bc051df85..df53bee4de2ef 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -59,6 +59,7 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout) << " time_id=" << time_event_next_id << ")."; } +EventCenter *EventCenter::centers[MAX_EVENTCENTER]; thread_local EventCenter* local_center = nullptr; int EventCenter::init(int n, unsigned i) @@ -142,7 +143,7 @@ EventCenter::~EventCenter() void EventCenter::set_owner() { - local_center = this; + centers[id] = local_center = this; } int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 353812e3d5dec..f02e113a7b9fe 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -91,6 +91,10 @@ inline EventCenter* center() { */ class EventCenter { using clock_type = ceph::coarse_mono_clock; + // should be enough; + static const int MAX_EVENTCENTER = 24; + static EventCenter *centers[MAX_EVENTCENTER]; + struct FileEvent { int mask; EventCallbackRef read_cb; @@ -159,6 +163,53 @@ class EventCenter { inline bool in_thread() const { return local_center == this; } + + private: + template + class C_submit_event : public EventCallback { + std::mutex lock; + std::condition_variable cond; + bool done = false; + func f; + bool nonwait; + public: + C_submit_event(func &&_f, bool nw) + : f(std::move(_f)), nonwait(nw) {} + void do_request(int id) { + f(); + lock.lock(); + cond.notify_all(); + done = true; + lock.unlock(); + if (nonwait) + delete this; + } + void wait() { + assert(!nonwait); + std::unique_lock l(lock); + while (!done) + cond.wait(l); + } + }; + + public: + template + static void submit_to(int i, func &&f, bool nowait = false) { + assert(i < MAX_EVENTCENTER); + EventCenter *c = centers[i]; + if (c->in_thread()) { + f(); + return ; + } + if (nowait) { + C_submit_event *event = new C_submit_event(std::move(f), true); + c->dispatch_event_external(event); + } else { + C_submit_event event(std::move(f), false); + c->dispatch_event_external(&event); + event.wait(); + } + }; }; #endif From ef7c3e304e7e03744fa6b35876e6e42316c66680 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 25 May 2016 10:07:18 +0800 Subject: [PATCH 17/28] AsyncConnection: execute existing fault in another Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 000b6bfeeb28d..2e9eb39f05ec1 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -1822,17 +1822,21 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis assert(recv_start == recv_end); existing->write_lock.Unlock(); - if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) { - // handle error - ldout(async_msgr->cct, 0) << __func__ << " reply fault for existing connection." << dendl; - existing->fault(); - } ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl; _stop(); // queue a reset on the new connection, which we're dumping for the old dispatch_queue->queue_reset(this); existing->lock.Unlock(); + + EventCenter::submit_to(existing->center->get_id(), [existing, connect, reply, authorizer_reply]() mutable { + Mutex::Locker l(existing->lock); + if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) { + // handle error + existing->fault(); + } + }, true); + return 0; } existing->lock.Unlock(); From cb58088256194825a42df426d52b97b4fa090563 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 25 May 2016 10:24:13 +0800 Subject: [PATCH 18/28] Event: change id to idx Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 4 ++-- src/msg/async/Event.h | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index df53bee4de2ef..b8d806ef356ba 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -67,7 +67,7 @@ int EventCenter::init(int n, unsigned i) // can't init multi times assert(nevent == 0); - id = i; + idx = i; #ifdef HAVE_EPOLL driver = new EpollDriver(cct); @@ -143,7 +143,7 @@ EventCenter::~EventCenter() void EventCenter::set_owner() { - centers[id] = local_center = this; + centers[idx] = local_center = this; } int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index f02e113a7b9fe..f692c71bbdbf0 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -124,7 +124,7 @@ class EventCenter { int notify_send_fd; NetHandler net; EventCallbackRef notify_handler; - unsigned id = 10000; + unsigned idx = 10000; int process_time_events(); FileEvent *_get_file_event(int fd) { @@ -146,9 +146,9 @@ class EventCenter { ~EventCenter(); ostream& _event_prefix(std::ostream *_dout); - int init(int nevent, unsigned id); + int init(int nevent, unsigned idx); void set_owner(); - unsigned get_id() { return id; } + unsigned get_id() { return idx; } // Used by internal thread int create_file_event(int fd, int mask, EventCallbackRef ctxt); From 65874f1e2414e985c04b089df868579c2bf92d17 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 25 May 2016 12:29:29 +0800 Subject: [PATCH 19/28] perf_local: fix api adjust Signed-off-by: Haomai Wang --- src/test/perf_local.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/perf_local.cc b/src/test/perf_local.cc index 7c33dc5d45462..7320dcd6d53a0 100644 --- a/src/test/perf_local.cc +++ b/src/test/perf_local.cc @@ -450,7 +450,7 @@ double eventcenter_poll() { int count = 1000000; EventCenter center(g_ceph_context); - center.init(1000); + center.init(1000, 0); center.set_owner(); uint64_t start = Cycles::rdtsc(); for (int i = 0; i < count; i++) { @@ -467,7 +467,7 @@ class CenterWorker : public Thread { public: EventCenter center; explicit CenterWorker(CephContext *c): cct(c), done(false), center(c) { - center.init(100); + center.init(100, 0); } void stop() { done = true; From fb480fd0adf0e961c9a006e1784aa9d4f766fd65 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 25 May 2016 17:55:04 +0800 Subject: [PATCH 20/28] AsyncConnection: make delete_time_event async Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 29 ++++++++--------------------- src/msg/async/AsyncConnection.h | 22 ++++++++++++---------- 2 files changed, 20 insertions(+), 31 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 2e9eb39f05ec1..4f8c0e3157a12 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -2447,16 +2447,10 @@ void AsyncConnection::DelayedDelivery::do_request(int id) } } -class C_flush_messages : public EventCallback { - std::deque > delay_queue; - AsyncMessenger *msgr; - DispatchQueue *dispatch_queue; - uint64_t conn_id; - public: - C_flush_messages(std::deque > &&q, AsyncMessenger *m, - DispatchQueue *dq, uint64_t cid) - : delay_queue(std::move(q)), msgr(m), dispatch_queue(dq), conn_id(cid) {} - void do_request(int id) { +void AsyncConnection::DelayedDelivery::flush() { + EventCenter::submit_to( + center->get_id(), [this] () mutable { + Mutex::Locker l(delay_lock); while (!delay_queue.empty()) { Message *m = delay_queue.front().second; if (msgr->ms_can_fast_dispatch(m)) { @@ -2466,17 +2460,10 @@ class C_flush_messages : public EventCallback { } delay_queue.pop_front(); } - delete this; - } -}; - -void AsyncConnection::DelayedDelivery::flush() { - Mutex::Locker l(delay_lock); - center->dispatch_event_external( - new C_flush_messages(std::move(delay_queue), msgr, dispatch_queue, conn_id)); - for (auto i : register_time_events) - center->delete_time_event(i); - register_time_events.clear(); + for (auto i : register_time_events) + center->delete_time_event(i); + register_time_events.clear(); + }, true); } void AsyncConnection::send_keepalive() diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index aac571a049762..1a458c873e18b 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -160,16 +160,18 @@ class AsyncConnection : public Connection { register_time_events.insert(center->create_time_event(delay_period*1000000, this)); } void discard() { - Mutex::Locker l(delay_lock); - while (!delay_queue.empty()) { - Message *m = delay_queue.front().second; - dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size()); - m->put(); - delay_queue.pop_front(); - } - for (auto i : register_time_events) - center->delete_time_event(i); - register_time_events.clear(); + EventCenter::submit_to(center->get_id(), [this] () mutable { + Mutex::Locker l(delay_lock); + while (!delay_queue.empty()) { + Message *m = delay_queue.front().second; + dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size()); + m->put(); + delay_queue.pop_front(); + } + for (auto i : register_time_events) + center->delete_time_event(i); + register_time_events.clear(); + }, true); } void flush(); } *delay_state; From 8256211e1afb636cd99b287f6fbdb94dba0cab55 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Thu, 26 May 2016 10:25:16 +0800 Subject: [PATCH 21/28] AsyncConnection: ensure delay_state empty when building a new session Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 8 ++++++++ src/msg/async/AsyncConnection.h | 7 ++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 4f8c0e3157a12..f002be603a163 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -1303,6 +1303,8 @@ ssize_t AsyncConnection::_process_connection() session_security.reset(); } + if (delay_state) + assert(delay_state->ready()); dispatch_queue->queue_connect(this); async_msgr->ms_deliver_handle_fast_connect(this); @@ -1478,6 +1480,8 @@ ssize_t AsyncConnection::_process_connection() state = STATE_OPEN; memset(&connect_msg, 0, sizeof(connect_msg)); + if (delay_state) + assert(delay_state->ready()); // make sure no pending tick timer center->delete_time_event(last_tick_id); last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); @@ -2427,6 +2431,8 @@ void AsyncConnection::DelayedDelivery::do_request(int id) { Mutex::Locker l(delay_lock); register_time_events.erase(id); + if (stop_dispatch) + return ; if (delay_queue.empty()) return ; utime_t release = delay_queue.front().first; @@ -2448,6 +2454,7 @@ void AsyncConnection::DelayedDelivery::do_request(int id) } void AsyncConnection::DelayedDelivery::flush() { + stop_dispatch = true; EventCenter::submit_to( center->get_id(), [this] () mutable { Mutex::Locker l(delay_lock); @@ -2463,6 +2470,7 @@ void AsyncConnection::DelayedDelivery::flush() { for (auto i : register_time_events) center->delete_time_event(i); register_time_events.clear(); + stop_dispatch = false; }, true); } diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 1a458c873e18b..8bef56c38a05b 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -143,12 +143,14 @@ class AsyncConnection : public Connection { EventCenter *center; DispatchQueue *dispatch_queue; uint64_t conn_id; + std::atomic_bool stop_dispatch; public: explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c, DispatchQueue *q, uint64_t cid) : delay_lock("AsyncConnection::DelayedDelivery::delay_lock"), - msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid) { } + msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid), + stop_dispatch(false) { } ~DelayedDelivery() { assert(register_time_events.empty()); assert(delay_queue.empty()); @@ -160,6 +162,7 @@ class AsyncConnection : public Connection { register_time_events.insert(center->create_time_event(delay_period*1000000, this)); } void discard() { + stop_dispatch = true; EventCenter::submit_to(center->get_id(), [this] () mutable { Mutex::Locker l(delay_lock); while (!delay_queue.empty()) { @@ -171,8 +174,10 @@ class AsyncConnection : public Connection { for (auto i : register_time_events) center->delete_time_event(i); register_time_events.clear(); + stop_dispatch = false; }, true); } + bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); } void flush(); } *delay_state; From 8907f52db228d0e2825d43fa30c63ede73fb36bc Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Fri, 27 May 2016 11:31:38 +0800 Subject: [PATCH 22/28] AsyncMessenger: remove extra release_worker path Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 14 +++++--------- src/msg/async/AsyncConnection.h | 6 +++--- src/msg/async/AsyncMessenger.cc | 33 ++++++++------------------------ src/msg/async/AsyncMessenger.h | 2 -- 4 files changed, 16 insertions(+), 39 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index f002be603a163..35809c8c116b2 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -119,9 +119,9 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) } AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, - EventCenter *c, PerfCounters *p) + Worker *w) : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), - logger(p), global_seq(0), connect_seq(0), peer_global_seq(0), + logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0), out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1), dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE), open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL), @@ -130,7 +130,8 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu last_active(ceph::coarse_mono_clock::now()), inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000), got_bad_auth(false), authorizer(NULL), replacing(false), - is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c) + is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), + worker(w), center(&w->center) { read_handler = new C_handle_read(this); write_handler = new C_handle_write(this); @@ -2234,6 +2235,7 @@ void AsyncConnection::_stop() dispatch_queue->discard_queue(conn_id); discard_out_queue(); async_msgr->unregister_conn(this); + worker->release_worker(); state = STATE_CLOSED; open_write = false; @@ -2491,12 +2493,6 @@ void AsyncConnection::mark_down() _stop(); } -void AsyncConnection::release_worker() -{ - if (msgr) - reinterpret_cast(msgr)->release_worker(center); -} - void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) { assert(write_lock.is_locked()); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 8bef56c38a05b..3f87d6886c648 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -37,6 +37,7 @@ using namespace std; #include "net_handler.h" class AsyncMessenger; +class Worker; static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); @@ -182,7 +183,7 @@ class AsyncConnection : public Connection { } *delay_state; public: - AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, EventCenter *c, PerfCounters *p); + AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w); ~AsyncConnection(); void maybe_start_delay_thread(); @@ -210,8 +211,6 @@ class AsyncConnection : public Connection { policy.lossy = true; } - void release_worker(); - private: enum { STATE_NONE, @@ -364,6 +363,7 @@ class AsyncConnection : public Connection { // used only by "read_until" uint64_t state_offset; NetHandler net; + Worker *worker; EventCenter *center; ceph::shared_ptr session_security; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 1dcd00dcd3dfe..7b859a0660e43 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -90,6 +90,10 @@ class Worker : public Thread { void *entry(); void stop(); PerfCounters *get_perf_counter() { return perf_logger; } + void release_worker() { + int oldref = references.fetch_sub(1); + assert(oldref > 0); + } }; /******************* @@ -357,7 +361,6 @@ class WorkerPool { virtual ~WorkerPool(); void start(); Worker *get_worker(); - void release_worker(EventCenter* c); int get_cpuid(int id) { if (coreids.empty()) return -1; @@ -495,21 +498,6 @@ Worker* WorkerPool::get_worker() return current_best; } -void WorkerPool::release_worker(EventCenter* c) -{ - ldout(cct, 10) << __func__ << dendl; - simple_spin_lock(&pool_spin); - for (auto p = workers.begin(); p != workers.end(); ++p) { - if (&((*p)->center) == c) { - ldout(cct, 10) << __func__ << " found worker, releasing" << dendl; - int oldref = (*p)->references.fetch_sub(1); - assert(oldref > 0); - break; - } - } - simple_spin_unlock(&pool_spin); -} - void WorkerPool::barrier() { ldout(cct, 10) << __func__ << " started." << dendl; @@ -543,8 +531,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, ceph_spin_init(&global_seq_lock); cct->lookup_or_create_singleton_object(pool, WorkerPool::name); local_worker = pool->get_worker(); - local_connection = new AsyncConnection( - cct, this, &dispatch_queue, &local_worker->center, local_worker->get_perf_counter()); + local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker); local_features = features; init_local_connection(); reap_handler = new C_handle_reap(this); @@ -678,7 +665,7 @@ AsyncConnectionRef AsyncMessenger::add_accept(int sd) { lock.Lock(); Worker *w = pool->get_worker(); - AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, &w->center, w->get_perf_counter()); + AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); conn->accept(sd); accepting_conns.insert(conn); lock.Unlock(); @@ -695,7 +682,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int // create connection Worker *w = pool->get_worker(); - AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, &w->center, w->get_perf_counter()); + AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); conn->connect(addr, type); assert(!conns.count(addr)); conns[addr] = conn; @@ -858,10 +845,7 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr) Connection *AsyncMessenger::create_anon_connection() { Mutex::Locker l(lock); Worker *w = pool->get_worker(); - return new AsyncConnection(cct, - this, - &dispatch_queue, - &w->center, w->get_perf_counter()); + return new AsyncConnection(cct, this, &dispatch_queue, w); } int AsyncMessenger::get_proto_version(int peer_type, bool connect) @@ -893,7 +877,6 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) void AsyncMessenger::unregister_conn(AsyncConnectionRef conn) { Mutex::Locker l(deleted_lock); - conn->release_worker(); deleted_conns.insert(conn); if (deleted_conns.size() >= ReapDeadConnectionThreshold) { diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 61f8f3b76d128..8e65c14ff55e6 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -454,8 +454,6 @@ class AsyncMessenger : public SimplePolicyMessenger { * See "deleted_conns" */ int reap_dead(); - - void release_worker(EventCenter* c); /** * @} // AsyncMessenger Internals From 84e19b58d8e2110dc0d074727af2e3da23daff60 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Sat, 4 Jun 2016 22:57:34 +0800 Subject: [PATCH 23/28] Event: avoid multi global conflict Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 7 +++++-- src/msg/async/Event.h | 17 +++++++++++++---- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index b8d806ef356ba..752fcb5371d49 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -59,7 +59,6 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout) << " time_id=" << time_event_next_id << ")."; } -EventCenter *EventCenter::centers[MAX_EVENTCENTER]; thread_local EventCenter* local_center = nullptr; int EventCenter::init(int n, unsigned i) @@ -143,7 +142,11 @@ EventCenter::~EventCenter() void EventCenter::set_owner() { - centers[idx] = local_center = this; + cct->lookup_or_create_singleton_object( + global_centers, "AsyncMessenger::EventCenter::global_center"); + assert(global_centers && !global_centers->centers[idx]); + global_centers->centers[idx] = local_center = this; + ldout(cct, 1) << __func__ << " idx=" << idx << " local_center=" << local_center << dendl; } int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index f692c71bbdbf0..184a7ca06e036 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -90,10 +90,17 @@ inline EventCenter* center() { * EventCenter maintain a set of file descriptor and handle registered events. */ class EventCenter { + using clock_type = ceph::coarse_mono_clock; // should be enough; static const int MAX_EVENTCENTER = 24; - static EventCenter *centers[MAX_EVENTCENTER]; + + struct AssociatedCenters { + EventCenter *centers[MAX_EVENTCENTER]; + AssociatedCenters(CephContext *c) { + memset(centers, 0, MAX_EVENTCENTER * sizeof(EventCenter*)); + } + }; struct FileEvent { int mask; @@ -125,6 +132,7 @@ class EventCenter { NetHandler net; EventCallbackRef notify_handler; unsigned idx = 10000; + AssociatedCenters *global_centers; int process_time_events(); FileEvent *_get_file_event(int fd) { @@ -194,9 +202,10 @@ class EventCenter { public: template - static void submit_to(int i, func &&f, bool nowait = false) { - assert(i < MAX_EVENTCENTER); - EventCenter *c = centers[i]; + void submit_to(int i, func &&f, bool nowait = false) { + assert(i < MAX_EVENTCENTER && global_centers); + EventCenter *c = global_centers->centers[i]; + assert(c); if (c->in_thread()) { f(); return ; From 4b5bb65b61985fcc9afa64cc76f184adbd3174e8 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Sat, 28 May 2016 23:03:33 +0800 Subject: [PATCH 24/28] Event: change submit_to related to cephcontext Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 4 ++-- src/msg/async/AsyncConnection.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 35809c8c116b2..7e14ff5df45a7 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -1834,7 +1834,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis dispatch_queue->queue_reset(this); existing->lock.Unlock(); - EventCenter::submit_to(existing->center->get_id(), [existing, connect, reply, authorizer_reply]() mutable { + center->submit_to(existing->center->get_id(), [existing, connect, reply, authorizer_reply]() mutable { Mutex::Locker l(existing->lock); if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) { // handle error @@ -2457,7 +2457,7 @@ void AsyncConnection::DelayedDelivery::do_request(int id) void AsyncConnection::DelayedDelivery::flush() { stop_dispatch = true; - EventCenter::submit_to( + center->submit_to( center->get_id(), [this] () mutable { Mutex::Locker l(delay_lock); while (!delay_queue.empty()) { diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 3f87d6886c648..5769827b9d720 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -164,7 +164,7 @@ class AsyncConnection : public Connection { } void discard() { stop_dispatch = true; - EventCenter::submit_to(center->get_id(), [this] () mutable { + center->submit_to(center->get_id(), [this] () mutable { Mutex::Locker l(delay_lock); while (!delay_queue.empty()) { Message *m = delay_queue.front().second; From 1f65a355bed462b6ae2d4f1cab1a6a60f794e0ef Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Mon, 20 Jun 2016 00:03:03 +0800 Subject: [PATCH 25/28] msg/async/Event: don't execute inline if nonwait Signed-off-by: Haomai Wang --- src/msg/async/Event.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 184a7ca06e036..27bbca5b7155b 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -206,7 +206,7 @@ class EventCenter { assert(i < MAX_EVENTCENTER && global_centers); EventCenter *c = global_centers->centers[i]; assert(c); - if (c->in_thread()) { + if (!nowait && c->in_thread()) { f(); return ; } From be8dec5597c5227f45f82e38c0361a77ea57cf2a Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Sun, 26 Jun 2016 23:07:18 +0800 Subject: [PATCH 26/28] msg/async/AsyncConnection: avoid dup RETRYGLOBAL sending if two accept all going to dispatch RETRYGLOBAL sending, it will cause invalid connect side receiver. Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 7e14ff5df45a7..6f7a38d0acc1a 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -1832,15 +1832,18 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis _stop(); // queue a reset on the new connection, which we're dumping for the old dispatch_queue->queue_reset(this); - existing->lock.Unlock(); - - center->submit_to(existing->center->get_id(), [existing, connect, reply, authorizer_reply]() mutable { + int new_fd = existing->sd; + center->submit_to(existing->center->get_id(), [existing, new_fd, connect, reply, authorizer_reply]() mutable { Mutex::Locker l(existing->lock); + if (new_fd != existing->sd) + return ; + if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) { // handle error existing->fault(); } }, true); + existing->lock.Unlock(); return 0; } From ada84e7c7d6a2b542ca96f4e077c03c05efe67f9 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 28 Jun 2016 12:11:13 +0800 Subject: [PATCH 27/28] msg/async/Event: change to use pthread_t to indicate whether the same thread thread_local has problem in TmapMigratePP.DataScan case that it will be assigned to zero unexpectedly. pthread_t is much cheaper than gettid since it's a library implementation. Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 7 +++---- src/msg/async/Event.h | 9 ++------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 752fcb5371d49..5e436d5a20c7b 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -59,8 +59,6 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout) << " time_id=" << time_event_next_id << ")."; } -thread_local EventCenter* local_center = nullptr; - int EventCenter::init(int n, unsigned i) { // can't init multi times @@ -145,8 +143,9 @@ void EventCenter::set_owner() cct->lookup_or_create_singleton_object( global_centers, "AsyncMessenger::EventCenter::global_center"); assert(global_centers && !global_centers->centers[idx]); - global_centers->centers[idx] = local_center = this; - ldout(cct, 1) << __func__ << " idx=" << idx << " local_center=" << local_center << dendl; + global_centers->centers[idx] = this; + owner = pthread_self(); + ldout(cct, 1) << __func__ << " idx=" << idx << " owner=" << owner << dendl; } int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 27bbca5b7155b..7375e6d9fa501 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -80,12 +80,6 @@ class EventDriver { virtual int resize_events(int newsize) = 0; }; -extern thread_local EventCenter* local_center; - -inline EventCenter* center() { - return local_center; -} - /* * EventCenter maintain a set of file descriptor and handle registered events. */ @@ -119,6 +113,7 @@ class EventCenter { CephContext *cct; int nevent; // Used only to external event + pthread_t owner; std::mutex external_lock, file_lock;; std::atomic_ulong external_num_events; deque external_events; @@ -169,7 +164,7 @@ class EventCenter { // Used by external thread void dispatch_event_external(EventCallbackRef e); inline bool in_thread() const { - return local_center == this; + return pthread_equal(pthread_self(), owner); } private: From 0001a07354f17c433090ed2106dc8024cbe33efd Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 28 Jun 2016 22:33:25 +0800 Subject: [PATCH 28/28] Revert "msg/AsyncMessenger: move Worker class to cc file" This reverts commit 23d4488e376de8f1a5c369365d91021ee8c9a324. --- src/msg/async/AsyncMessenger.cc | 59 --------------------------------- src/msg/async/AsyncMessenger.h | 55 ++++++++++++++++++++++++++++-- 2 files changed, 53 insertions(+), 61 deletions(-) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 7b859a0660e43..1a5e22aebd2fd 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -50,52 +50,6 @@ static ostream& _prefix(std::ostream *_dout, WorkerPool *p) { } -class Worker : public Thread { - static const uint64_t InitEventNumber = 5000; - static const uint64_t EventMaxWaitUs = 30000000; - CephContext *cct; - WorkerPool *pool; - bool done; - int id; - PerfCounters *perf_logger; - - public: - EventCenter center; - std::atomic_uint references; - Worker(CephContext *c, WorkerPool *p, int i) - : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) { - center.init(InitEventNumber, i); - char name[128]; - sprintf(name, "AsyncMessenger::Worker-%d", id); - // initialize perf_logger - PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); - - plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); - plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); - plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages"); - plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); - plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); - - perf_logger = plb.create_perf_counters(); - cct->get_perfcounters_collection()->add(perf_logger); - } - ~Worker() { - if (perf_logger) { - cct->get_perfcounters_collection()->remove(perf_logger); - delete perf_logger; - } - } - void *entry(); - void stop(); - PerfCounters *get_perf_counter() { return perf_logger; } - void release_worker() { - int oldref = references.fetch_sub(1); - assert(oldref > 0); - } -}; - /******************* * Processor */ @@ -875,15 +829,6 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) return 0; } -void AsyncMessenger::unregister_conn(AsyncConnectionRef conn) { - Mutex::Locker l(deleted_lock); - deleted_conns.insert(conn); - - if (deleted_conns.size() >= ReapDeadConnectionThreshold) { - local_worker->center.dispatch_event_external(reap_handler); - } -} - void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) { // be careful here: multiple threads may block here, and readers of @@ -928,7 +873,3 @@ int AsyncMessenger::reap_dead() return num; } - -void AsyncMessenger::release_worker(EventCenter* c) { - pool->release_worker(c); -} diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 8e65c14ff55e6..9ac8a1a5c66b8 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -56,7 +56,51 @@ enum { }; -class Worker; +class Worker : public Thread { + static const uint64_t InitEventNumber = 5000; + static const uint64_t EventMaxWaitUs = 30000000; + CephContext *cct; + WorkerPool *pool; + bool done; + int id; + PerfCounters *perf_logger; + + public: + EventCenter center; + std::atomic_uint references; + Worker(CephContext *c, WorkerPool *p, int i) + : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) { + center.init(InitEventNumber, i); + char name[128]; + sprintf(name, "AsyncMessenger::Worker-%d", id); + // initialize perf_logger + PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); + + plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); + plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); + plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages"); + plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); + plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes"); + plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); + plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); + + perf_logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(perf_logger); + } + ~Worker() { + if (perf_logger) { + cct->get_perfcounters_collection()->remove(perf_logger); + delete perf_logger; + } + } + void *entry(); + void stop(); + PerfCounters *get_perf_counter() { return perf_logger; } + void release_worker() { + int oldref = references.fetch_sub(1); + assert(oldref > 0); + } +}; /** * If the Messenger binds to a specific address, the Processor runs @@ -444,7 +488,14 @@ class AsyncMessenger : public SimplePolicyMessenger { * * See "deleted_conns" */ - void unregister_conn(AsyncConnectionRef conn); + void unregister_conn(AsyncConnectionRef conn) { + Mutex::Locker l(deleted_lock); + deleted_conns.insert(conn); + + if (deleted_conns.size() >= ReapDeadConnectionThreshold) { + local_worker->center.dispatch_event_external(reap_handler); + } + } /** * Reap dead connection from `deleted_conns`