Skip to content

Commit

Permalink
PROTON-1574: Fix lock-order-inversion warnings
Browse files Browse the repository at this point in the history
The lock introduced in

    17d2a6f PROTON-1568: c++ enable race detection for self-tests

created lock-order problems.

Using the lock as a simple memory barrier solves the original race and the
lock-order problem.
  • Loading branch information
alanconway committed Sep 6, 2017
1 parent 5f4d852 commit eac4e31
Showing 1 changed file with 27 additions and 40 deletions.
67 changes: 27 additions & 40 deletions proton-c/src/proactor/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,25 @@ typedef enum {

// Data to use with epoll.
typedef struct epoll_extended_t {
/* epoll_ctl()/epoll_wake() do not form a memory barrier, so cached memory
writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
visible to epoll_wait() thread. Lock use of epoll_extended_t to be safe.
*/
pmutex mutex;
struct psocket_t *psocket; // pconnection, listener, or NULL -> proactor
int fd;
epoll_type_t type; // io/timer/wakeup
uint32_t wanted; // events to poll for
bool polling;
pmutex barrier_mutex;
} epoll_extended_t;

/* epoll_ctl()/epoll_wake() do not form a memory barrier, so cached memory
writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
visible to epoll_wait() thread. This function creates a memory barrier,
called before epoll_ctl() and after epoll_wait()
*/
static void memory_barrier(epoll_extended_t *ee) {
// Mutex lock/unlock has the side-effect of being a memory barrier.
lock(&ee->barrier_mutex);
unlock(&ee->barrier_mutex);
}

/*
* This timerfd logic assumes EPOLLONESHOT and there never being two
* active timeout callbacks. There can be multiple (or zero)
Expand Down Expand Up @@ -282,33 +289,26 @@ PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
static bool start_polling(epoll_extended_t *ee, int epollfd) {
if (ee->polling)
return false;
pmutex_init(&ee->mutex);
lock(&ee->mutex);
ee->polling = true;
struct epoll_event ev;
ev.data.ptr = ee;
ev.events = ee->wanted | EPOLLONESHOT;
int fd = ee->fd;
unlock(&ee->mutex);
return (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) == 0);
memory_barrier(ee);
return (epoll_ctl(epollfd, EPOLL_CTL_ADD, ee->fd, &ev) == 0);
}

static void stop_polling(epoll_extended_t *ee, int epollfd) {
// TODO: check for error, return bool or just log?
lock(&ee->mutex);
if (ee->fd == -1 || !ee->polling || epollfd == -1)
return;
struct epoll_event ev;
ev.data.ptr = ee;
ev.events = 0;
unlock(&ee->mutex);
memory_barrier(ee);
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, ee->fd, &ev) == -1)
EPOLL_FATAL("EPOLL_CTL_DEL", errno);
lock(&ee->mutex);
ee->fd = -1;
ee->polling = false;
unlock(&ee->mutex);
pmutex_finalize(&ee->mutex);
}

/*
Expand Down Expand Up @@ -642,11 +642,9 @@ static void psocket_gai_error(psocket_t *ps, int gai_err, const char* what) {
static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
struct epoll_event ev;
ev.data.ptr = ee;
lock(&ee->mutex);
ev.events = ee->wanted | EPOLLONESHOT;
int fd = ee->fd;
unlock(&ee->mutex);
if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, fd, &ev) == -1)
memory_barrier(ee);
if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
EPOLL_FATAL("arming polled file descriptor", errno);
}

Expand Down Expand Up @@ -1185,11 +1183,9 @@ static void pconnection_start(pconnection_t *pc) {

start_polling(&pc->timer.epoll_io, efd); // TODO: check for error
epoll_extended_t *ee = &pc->psocket.epoll_io;
lock(&ee->mutex);
ee->fd = pc->psocket.sockfd;
ee->wanted = EPOLLIN | EPOLLOUT;
ee->polling = false;
unlock(&ee->mutex);
start_polling(ee, efd); // TODO: check for error
}

Expand Down Expand Up @@ -1256,6 +1252,8 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)

lock(&pc->context.mutex);
proactor_add(&pc->context);
pn_connection_open(pc->driver.connection); /* Auto-open */

bool notify = false;
bool notify_proactor = false;

Expand Down Expand Up @@ -1621,14 +1619,11 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {

/* Set up an epoll_extended_t to be used for wakeup or interrupts */
static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd) {
pmutex_init(&ee->mutex);
lock(&ee->mutex);
ee->psocket = NULL;
ee->fd = eventfd;
ee->type = WAKE;
ee->wanted = EPOLLIN;
ee->polling = false;
unlock(&ee->mutex);
start_polling(ee, epollfd); // TODO: check for error
}

Expand All @@ -1640,7 +1635,6 @@ pn_proactor_t *pn_proactor() {
pmutex_init(&p->eventfd_mutex);
pmutex_init(&p->bind_mutex);
ptimer_init(&p->timer, 0);
pmutex_init(&p->overflow_mutex);

if ((p->epollfd = epoll_create(1)) >= 0) {
if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
Expand Down Expand Up @@ -1822,11 +1816,7 @@ static bool proactor_remove(pcontext_t *ctx) {
}

static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t *ee) {
lock(&ee->mutex);
int fd = ee->fd;
unlock(&ee->mutex);

if (fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */
if (ee->fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */
(void)read_uint64(p->interruptfd);
rearm(p, &p->epoll_interrupt);
return proactor_process(p, PN_PROACTOR_INTERRUPT);
Expand Down Expand Up @@ -1871,28 +1861,25 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_blo
}
assert(n == 1);
epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr;
memory_barrier(ee);

lock(&ee->mutex);
epoll_type_t type = ee->type;
psocket_t* psocket = ee->psocket;
unlock(&ee->mutex);
if (type == WAKE) {
if (ee->type == WAKE) {
batch = process_inbound_wake(p, ee);
} else if (type == PROACTOR_TIMER) {
} else if (ee->type == PROACTOR_TIMER) {
batch = proactor_process(p, PN_PROACTOR_TIMEOUT);
} else {
pconnection_t *pc = psocket_pconnection(psocket);
pconnection_t *pc = psocket_pconnection(ee->psocket);
if (pc) {
if (type == PCONNECTION_IO) {
if (ee->type == PCONNECTION_IO) {
batch = pconnection_process(pc, ev.events, false, false);
} else {
assert(type == PCONNECTION_TIMER);
assert(ee->type == PCONNECTION_TIMER);
batch = pconnection_process(pc, 0, true, false);
}
}
else {
// TODO: can any of the listener processing be parallelized like IOCP?
batch = listener_process(psocket, ev.events);
batch = listener_process(ee->psocket, ev.events);
}
}

Expand Down

0 comments on commit eac4e31

Please sign in to comment.