Skip to content

Commit

Permalink
Merge pull request #7453 from ceph/wip-14566
Browse files Browse the repository at this point in the history
msg/async: fix potential race condition
  • Loading branch information
liewegas committed Feb 4, 2016
2 parents 300106a + 2292dd9 commit 0e42ee9
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 36 deletions.
14 changes: 14 additions & 0 deletions src/msg/async/AsyncConnection.cc
Expand Up @@ -2002,6 +2002,7 @@ void AsyncConnection::accept(int incoming)
ldout(async_msgr->cct, 10) << __func__ << " sd=" << incoming << dendl;
assert(sd < 0);

Mutex::Locker l(lock);
sd = incoming;
state = STATE_ACCEPTING;
center->create_file_event(sd, EVENT_READABLE, read_handler);
Expand Down Expand Up @@ -2566,3 +2567,16 @@ void AsyncConnection::local_deliver()
write_lock.Lock();
}
}

void AsyncConnection::cleanup_handler()
{
ldout(async_msgr->cct, 1) << __func__ << dendl;

delete read_handler;
delete write_handler;
delete reset_handler;
delete remote_reset_handler;
delete connect_handler;
delete local_deliver_handler;
delete wakeup_handler;
}
10 changes: 1 addition & 9 deletions src/msg/async/AsyncConnection.h
Expand Up @@ -315,15 +315,7 @@ class AsyncConnection : public Connection {
lock.Unlock();
mark_down();
}
void cleanup_handler() {
delete read_handler;
delete write_handler;
delete reset_handler;
delete remote_reset_handler;
delete connect_handler;
delete local_deliver_handler;
delete wakeup_handler;
}
void cleanup_handler();
PerfCounters *get_perf_counter() {
return logger;
}
Expand Down
3 changes: 2 additions & 1 deletion src/msg/async/AsyncMessenger.cc
Expand Up @@ -286,7 +286,7 @@ void *Worker::entry()
}
}

center.set_owner(pthread_self());
center.set_owner();
while (!done) {
ldout(cct, 20) << __func__ << " calling event process" << dendl;

Expand Down Expand Up @@ -741,6 +741,7 @@ void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)

int AsyncMessenger::reap_dead()
{
ldout(cct, 1) << __func__ << " start" << dendl;
int num = 0;

Mutex::Locker l1(lock);
Expand Down
65 changes: 43 additions & 22 deletions src/msg/async/Event.cc
Expand Up @@ -62,6 +62,8 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout)
<< " time_id=" << time_event_next_id << ").";
}

static thread_local pthread_t thread_id = 0;

int EventCenter::init(int n)
{
// can't init multi times
Expand Down Expand Up @@ -126,6 +128,12 @@ EventCenter::~EventCenter()
free(file_events);
}


void EventCenter::set_owner()
{
thread_id = owner = pthread_self();
}

int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
{
int r = 0;
Expand Down Expand Up @@ -334,15 +342,21 @@ int EventCenter::process_events(int timeout_microseconds)
int numevents;
bool trigger_time = false;

utime_t period, shortest, now = ceph_clock_now(cct);
now.copy_to_timeval(&tv);
if (timeout_microseconds > 0) {
tv.tv_sec += timeout_microseconds / 1000000;
tv.tv_usec += timeout_microseconds % 1000000;
}
shortest.set_from_timeval(&tv);
utime_t now = ceph_clock_now(cct);;
// If exists external events, don't block
if (external_num_events.read()) {
tv.tv_sec = 0;
tv.tv_usec = 0;
next_time = now;
} else {
utime_t period, shortest;
now.copy_to_timeval(&tv);
if (timeout_microseconds > 0) {
tv.tv_sec += timeout_microseconds / 1000000;
tv.tv_usec += timeout_microseconds % 1000000;
}
shortest.set_from_timeval(&tv);

{
Mutex::Locker l(time_lock);
map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
if (it != time_events.end() && shortest >= it->first) {
Expand All @@ -360,11 +374,11 @@ int EventCenter::process_events(int timeout_microseconds)
tv.tv_sec = timeout_microseconds / 1000000;
tv.tv_usec = timeout_microseconds % 1000000;
}
next_time = shortest;
}

ldout(cct, 10) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
vector<FiredFileEvent> fired_events;
next_time = shortest;
numevents = driver->event_wait(fired_events, &tv);
file_lock.Lock();
for (int j = 0; j < numevents; j++) {
Expand Down Expand Up @@ -402,18 +416,21 @@ int EventCenter::process_events(int timeout_microseconds)
if (trigger_time)
numevents += process_time_events();

external_lock.Lock();
if (external_events.empty()) {
external_lock.Unlock();
} else {
deque<EventCallbackRef> cur_process;
cur_process.swap(external_events);
external_lock.Unlock();
while (!cur_process.empty()) {
EventCallbackRef e = cur_process.front();
if (e)
e->do_request(0);
cur_process.pop_front();
if (external_num_events.read()) {
external_lock.Lock();
if (external_events.empty()) {
external_lock.Unlock();
} else {
deque<EventCallbackRef> cur_process;
cur_process.swap(external_events);
external_num_events.set(0);
external_lock.Unlock();
while (!cur_process.empty()) {
EventCallbackRef e = cur_process.front();
if (e)
e->do_request(0);
cur_process.pop_front();
}
}
}
return numevents;
Expand All @@ -423,6 +440,10 @@ void EventCenter::dispatch_event_external(EventCallbackRef e)
{
external_lock.Lock();
external_events.push_back(e);
uint64_t num = external_num_events.inc();
external_lock.Unlock();
wakeup();
if (thread_id != owner)
wakeup();

ldout(cct, 10) << __func__ << " " << e << " pending " << num << dendl;
}
4 changes: 3 additions & 1 deletion src/msg/async/Event.h
Expand Up @@ -103,6 +103,7 @@ class EventCenter {
int nevent;
// Used only to external event
Mutex external_lock, file_lock, time_lock;
atomic_t external_num_events;
deque<EventCallbackRef> external_events;
FileEvent *file_events;
EventDriver *driver;
Expand Down Expand Up @@ -132,6 +133,7 @@ class EventCenter {
external_lock("AsyncMessenger::external_lock"),
file_lock("AsyncMessenger::file_lock"),
time_lock("AsyncMessenger::time_lock"),
external_num_events(0),
file_events(NULL),
driver(NULL), time_event_next_id(1),
notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0), already_wakeup(0) {
Expand All @@ -141,7 +143,7 @@ class EventCenter {
ostream& _event_prefix(std::ostream *_dout);

int init(int nevent);
void set_owner(pthread_t p) { owner = p; }
void set_owner();
pthread_t get_owner() { return owner; }

// Used by internal thread
Expand Down
2 changes: 1 addition & 1 deletion src/test/msgr/test_async_driver.cc
Expand Up @@ -280,7 +280,7 @@ class Worker : public Thread {
center.wakeup();
}
void* entry() {
center.set_owner(pthread_self());
center.set_owner();
while (!done)
center.process_events(1000000);
return 0;
Expand Down
4 changes: 2 additions & 2 deletions src/test/perf_local.cc
Expand Up @@ -451,7 +451,7 @@ double eventcenter_poll()
int count = 1000000;
EventCenter center(g_ceph_context);
center.init(1000);
center.set_owner(pthread_self());
center.set_owner();
uint64_t start = Cycles::rdtsc();
for (int i = 0; i < count; i++) {
center.process_events(0);
Expand All @@ -474,7 +474,7 @@ class CenterWorker : public Thread {
center.wakeup();
}
void* entry() {
center.set_owner(pthread_self());
center.set_owner();
bind_thread_to_cpu(2);
while (!done)
center.process_events(1000);
Expand Down

0 comments on commit 0e42ee9

Please sign in to comment.