Skip to content

Commit

Permalink
Merge pull request #11430 from wjwithagen/wip-wjw-freebsd-EventKqueue
Browse files Browse the repository at this point in the history
FreeBSD/EventKqueue.{h,cc} Added code to restore events on (thread)fork

Reviewed-by: Haomai Wang <haomai@xsky.com>
  • Loading branch information
yuyuyu101 committed Oct 16, 2016
2 parents b0c8651 + 9eeb491 commit 21d5f49
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 26 deletions.
183 changes: 161 additions & 22 deletions src/msg/async/EventKqueue.cc
Expand Up @@ -22,75 +22,193 @@
#undef dout_prefix
#define dout_prefix *_dout << "KqueueDriver."

#define KEVENT_NOWAIT 0

int KqueueDriver::test_kqfd() {
struct kevent ke[1];
if (kevent(kqfd, ke, 0, NULL, 0, KEVENT_NOWAIT) == -1) {
ldout(cct,0) << __func__ << " invalid kqfd = " << kqfd
<< cpp_strerror(errno) << dendl;
return -errno;
}
return kqfd;
}

int KqueueDriver::restore_events() {
struct kevent ke[2];
int i;

ldout(cct,10) << __func__ << " on kqfd = " << kqfd << dendl;
for(i=0;i<size;i++) {
int num = 0;
if (sav_events[i].mask == 0 )
continue;
ldout(cct,10) << __func__ << " restore kqfd = " << kqfd
<< " fd = " << i << " mask " << sav_events[i].mask << dendl;
if (sav_events[i].mask & EVENT_READABLE)
EV_SET(&ke[num++], i, EVFILT_READ, EV_ADD, 0, 0, NULL);
if (sav_events[i].mask & EVENT_WRITABLE)
EV_SET(&ke[num++], i, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
if (num) {
if (kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT) == -1) {
ldout(cct,0) << __func__ << " unable to add event: "
<< cpp_strerror(errno) << dendl;
return -errno;
}
}
}
return 0;
}

int KqueueDriver::test_thread_change(const char* funcname) {
// check to see if we changed thread, because that invalidates
// the kqfd and we need to restore that
int oldkqfd = kqfd;

if (!pthread_equal(mythread, pthread_self())) {
ldout(cct,10) << funcname << " We changed thread from " << mythread
<< " to " << pthread_self() << dendl;
mythread = pthread_self();
kqfd = -1;
} else if ((kqfd != -1) && (test_kqfd() < 0)) {
// should this ever happen?
// It would be strange to change kqfd with thread change.
// Might nee to change this into an assert() in the future.
ldout(cct,0) << funcname << " Warning: Recreating old kqfd. "
<< "This should not happen!!!" << dendl;
kqfd = -1;
}
if (kqfd == -1) {
kqfd = kqueue();
ldout(cct,10) << funcname << " kqueue: new kqfd = " << kqfd
<< " (was: " << oldkqfd << ")"
<< dendl;
if (kqfd < 0) {
lderr(cct) << funcname << " unable to do kqueue: "
<< cpp_strerror(errno) << dendl;
return -errno;
}
if (restore_events()< 0) {
lderr(cct) << funcname << " unable restore all events "
<< cpp_strerror(errno) << dendl;
return -errno;
}
}
return 0;
}

int KqueueDriver::init(int nevent)
{
events = (struct kevent*)malloc(sizeof(struct kevent)*nevent);
if (!events) {
// keep track of possible changes of our thread
// because change of thread kills the kqfd
mythread = pthread_self();

// Reserve the space to accept the kevent return events.
res_events = (struct kevent*)malloc(sizeof(struct kevent)*nevent);
if (!res_events) {
lderr(cct) << __func__ << " unable to malloc memory: "
<< cpp_strerror(errno) << dendl;
return -ENOMEM;
}
memset(events, 0, sizeof(struct kevent)*nevent);
memset(res_events, 0, sizeof(struct kevent)*nevent);
size = nevent;

kqfd = kqueue();
if (kqfd < 0) {
lderr(cct) << __func__ << " unable to do kqueue: "
// Reserve the space to keep all of the events set, so it can be redone
// when we change trhread ID.
sav_events = (struct SaveEvent*)malloc(sizeof(struct SaveEvent)*nevent);
if (!sav_events) {
lderr(cct) << __func__ << " unable to malloc memory: "
<< cpp_strerror(errno) << dendl;
return -errno;
return -ENOMEM;
}
memset(sav_events, 0, sizeof(struct SaveEvent)*nevent);
sav_max = nevent;

size = nevent;

// Delay assigning a descriptor until it is really needed.
// kqfd = kqueue();
kqfd = -1;
return 0;
}

int KqueueDriver::add_event(int fd, int cur_mask, int add_mask)
{
ldout(cct, 20) << __func__ << " add event fd=" << fd << " cur_mask=" << cur_mask
<< "add_mask" << add_mask << dendl;
struct kevent ke[2];
int num = 0;

ldout(cct,20) << __func__ << " add event kqfd = " << kqfd << " fd = " << fd
<< " cur_mask = " << cur_mask << " add_mask = " << add_mask
<< dendl;

int r = test_thread_change(__func__);
if ( r < 0 )
return r;

if (add_mask & EVENT_READABLE)
EV_SET(&ke[num++], fd, EVFILT_READ, EV_ADD|EV_CLEAR, 0, 0, NULL);
if (add_mask & EVENT_WRITABLE)
EV_SET(&ke[num++], fd, EVFILT_WRITE, EV_ADD|EV_CLEAR, 0, 0, NULL);

if (num) {
if (kevent(kqfd, ke, num, NULL, 0, NULL) == -1) {
if (kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT) == -1) {
lderr(cct) << __func__ << " unable to add event: "
<< cpp_strerror(errno) << dendl;
return -errno;
}
}

// keep what we set
if (fd >= sav_max)
resize_events(sav_max+5000);
sav_events[fd].mask = cur_mask | add_mask;
return 0;
}

int KqueueDriver::del_event(int fd, int cur_mask, int delmask)
int KqueueDriver::del_event(int fd, int cur_mask, int del_mask)
{
ldout(cct, 20) << __func__ << " del event fd=" << fd << " cur mask=" << cur_mask
<< " delmask=" << delmask << dendl;
struct kevent ke[2];
int num = 0;
int mask = cur_mask & delmask;
int mask = cur_mask & del_mask;

ldout(cct,20) << __func__ << " delete event kqfd = " << kqfd
<< " fd = " << fd << " cur_mask = " << cur_mask
<< " del_mask = " << del_mask << dendl;

int r = test_thread_change(__func__);
if ( r < 0 )
return r;

if (mask & EVENT_READABLE)
EV_SET(&ke[num++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
if (mask & EVENT_WRITABLE)
EV_SET(&ke[num++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);

if (num) {
int r = 0;
if ((r = kevent(kqfd, ke, num, NULL, 0, NULL)) < 0) {
if ((r = kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT)) < 0) {
lderr(cct) << __func__ << " kevent: delete fd=" << fd << " mask=" << mask
<< " failed." << cpp_strerror(errno) << dendl;
return -errno;
}
}
// keep the administration
sav_events[fd].mask = cur_mask & ~del_mask;
return 0;
}

int KqueueDriver::resize_events(int newsize)
{
ldout(cct,10) << __func__ << " kqfd = " << kqfd << "newsize = " << newsize
<< dendl;
if(newsize > sav_max) {
sav_events = (struct SaveEvent*)realloc( sav_events,
sizeof(struct SaveEvent)*newsize);
if (!sav_events) {
lderr(cct) << __func__ << " unable to realloc memory: "
<< cpp_strerror(errno) << dendl;
return -ENOMEM;
}
memset(&sav_events[size], 0, sizeof(struct SaveEvent)*(newsize-sav_max));
sav_max = newsize;
}
return 0;
}

Expand All @@ -99,22 +217,43 @@ int KqueueDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeva
int retval, numevents = 0;
struct timespec timeout;

ldout(cct,10) << __func__ << " kqfd = " << kqfd << dendl;

int r = test_thread_change(__func__);
if ( r < 0 )
return r;

if (tvp != NULL) {
timeout.tv_sec = tvp->tv_sec;
timeout.tv_nsec = tvp->tv_usec * 1000;
retval = kevent(kqfd, NULL, 0, events, size, &timeout);
ldout(cct,20) << __func__ << " "
<< timeout.tv_sec << " sec "
<< timeout.tv_nsec << " nsec"
<< dendl;
retval = kevent(kqfd, NULL, 0, res_events, size, &timeout);
} else {
retval = kevent(kqfd, NULL, 0, events, size, NULL);
ldout(cct,20) << __func__ << " event_wait: " << " NULL" << dendl;
retval = kevent(kqfd, NULL, 0, res_events, size, KEVENT_NOWAIT);
}

if (retval > 0) {
ldout(cct,25) << __func__ << " kevent retval: " << retval << dendl;
if (retval < 0) {
lderr(cct) << __func__ << " kqueue error: "
<< cpp_strerror(errno) << dendl;
return -errno;
} else if (retval == 0) {
ldout(cct,5) << __func__ << " Hit timeout("
<< timeout.tv_sec << " sec "
<< timeout.tv_nsec << " nsec"
<< ")." << dendl;
} else {
int j;

numevents = retval;
fired_events.resize(numevents);
for (j = 0; j < numevents; j++) {
int mask = 0;
struct kevent *e = events + j;
struct kevent *e = res_events + j;

if (e->filter == EVFILT_READ) mask |= EVENT_READABLE;
if (e->filter == EVFILT_WRITE) mask |= EVENT_WRITABLE;
Expand Down
25 changes: 21 additions & 4 deletions src/msg/async/EventKqueue.h
Expand Up @@ -25,18 +25,35 @@

class KqueueDriver : public EventDriver {
int kqfd;
struct kevent *events;
pthread_t mythread;
struct kevent *res_events;
CephContext *cct;
int size;

// Keep what we set on the kqfd
struct SaveEvent{
int fd;
int mask;
};
struct SaveEvent *sav_events;
int sav_max;
int restore_events();
int test_kqfd();
int test_thread_change(const char* funcname);

public:
explicit KqueueDriver(CephContext *c): kqfd(-1), events(NULL), cct(c), size(0) {}
explicit KqueueDriver(CephContext *c): kqfd(-1), res_events(NULL), cct(c),
size(0), sav_max(0) {}
virtual ~KqueueDriver() {
if (kqfd != -1)
close(kqfd);

if (events)
free(events);
if (res_events)
free(res_events);
size = 0;
if (sav_events)
free(sav_events);
sav_max = 0;
}

int init(int nevent) override;
Expand Down

0 comments on commit 21d5f49

Please sign in to comment.