Skip to content

Commit

Permalink
ae.c event loop does no longer support exception notifications, as th…
Browse files Browse the repository at this point in the history
…ey are fully pointless. Also a theoretical bug that never happens in practice fixed.
  • Loading branch information
antirez committed Jan 20, 2010
1 parent 8e68871 commit 621d5c1
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 25 deletions.
18 changes: 8 additions & 10 deletions ae.c
Expand Up @@ -93,7 +93,6 @@ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
if (mask & AE_EXCEPTION) fe->efileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
Expand Down Expand Up @@ -325,18 +324,19 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;

/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE)
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
if (fe->mask & mask & AE_WRITABLE && fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
if (fe->mask & mask & AE_EXCEPTION &&
fe->efileProc != fe->wfileProc &&
fe->efileProc != fe->rfileProc)
fe->efileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
Expand All @@ -362,11 +362,9 @@ int aeWait(int fd, int mask, long long milliseconds) {

if (mask & AE_READABLE) FD_SET(fd,&rfds);
if (mask & AE_WRITABLE) FD_SET(fd,&wfds);
if (mask & AE_EXCEPTION) FD_SET(fd,&efds);
if ((retval = select(fd+1, &rfds, &wfds, &efds, &tv)) > 0) {
if (FD_ISSET(fd,&rfds)) retmask |= AE_READABLE;
if (FD_ISSET(fd,&wfds)) retmask |= AE_WRITABLE;
if (FD_ISSET(fd,&efds)) retmask |= AE_EXCEPTION;
return retmask;
} else {
return retval;
Expand Down
4 changes: 1 addition & 3 deletions ae.h
Expand Up @@ -41,7 +41,6 @@
#define AE_NONE 0
#define AE_READABLE 1
#define AE_WRITABLE 2
#define AE_EXCEPTION 4

#define AE_FILE_EVENTS 1
#define AE_TIME_EVENTS 2
Expand All @@ -62,10 +61,9 @@ typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientDat

/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|EXCEPTION) */
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
aeFileProc *efileProc;
void *clientData;
} aeFileEvent;

Expand Down
3 changes: 0 additions & 3 deletions ae_epoll.c
Expand Up @@ -38,7 +38,6 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
if (mask & AE_EXCEPTION) ee.events |= EPOLLPRI;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
Expand All @@ -53,7 +52,6 @@ static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
if (mask & AE_EXCEPTION) ee.events |= EPOLLPRI;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (mask != AE_NONE) {
Expand Down Expand Up @@ -81,7 +79,6 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {

if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLPRI) mask |= AE_EXCEPTION;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
Expand Down
12 changes: 3 additions & 9 deletions ae_select.c
Expand Up @@ -5,10 +5,10 @@
#include <string.h>

typedef struct aeApiState {
fd_set rfds, wfds, efds;
fd_set rfds, wfds;
/* We need to have a copy of the fd sets as it's not safe to reuse
* FD sets after select(). */
fd_set _rfds, _wfds, _efds;
fd_set _rfds, _wfds;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
Expand All @@ -17,7 +17,6 @@ static int aeApiCreate(aeEventLoop *eventLoop) {
if (!state) return -1;
FD_ZERO(&state->rfds);
FD_ZERO(&state->wfds);
FD_ZERO(&state->efds);
eventLoop->apidata = state;
return 0;
}
Expand All @@ -31,7 +30,6 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {

if (mask & AE_READABLE) FD_SET(fd,&state->rfds);
if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds);
if (mask & AE_EXCEPTION) FD_SET(fd,&state->efds);
return 0;
}

Expand All @@ -40,7 +38,6 @@ static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {

if (mask & AE_READABLE) FD_CLR(fd,&state->rfds);
if (mask & AE_WRITABLE) FD_CLR(fd,&state->wfds);
if (mask & AE_EXCEPTION) FD_CLR(fd,&state->efds);
}

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
Expand All @@ -49,10 +46,9 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {

memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
memcpy(&state->_efds,&state->efds,sizeof(fd_set));

retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,&state->_efds,tvp);
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) {
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0;
Expand All @@ -63,8 +59,6 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
if (fe->mask & AE_EXCEPTION && FD_ISSET(j,&state->_efds))
mask |= AE_EXCEPTION;
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
Expand Down
9 changes: 9 additions & 0 deletions redis-cli.c
Expand Up @@ -134,6 +134,7 @@ static struct redisCommand cmdTable[] = {
{"debug",-2,REDIS_CMD_INLINE},
{"mset",-3,REDIS_CMD_MULTIBULK},
{"msetnx",-3,REDIS_CMD_MULTIBULK},
{"monitor",1,REDIS_CMD_INLINE},
{NULL,0,0}
};

Expand Down Expand Up @@ -188,6 +189,7 @@ static int cliReadSingleLineReply(int fd, int quiet) {
if (reply == NULL) return 1;
if (!quiet)
printf("%s\n", reply);
sdsfree(reply);
return 0;
}

Expand Down Expand Up @@ -287,6 +289,7 @@ static int selectDb(int fd)
static int cliSendCommand(int argc, char **argv) {
struct redisCommand *rc = lookupCommand(argv[0]);
int fd, j, retval = 0;
int read_forever = 0;
sds cmd;

if (!rc) {
Expand All @@ -299,6 +302,7 @@ static int cliSendCommand(int argc, char **argv) {
fprintf(stderr,"Wrong number of arguments for '%s'\n",rc->name);
return 1;
}
if (!strcasecmp(rc->name,"monitor")) read_forever = 1;
if ((fd = cliConnect()) == -1) return 1;

/* Select db number */
Expand Down Expand Up @@ -337,6 +341,11 @@ static int cliSendCommand(int argc, char **argv) {
}
anetWrite(fd,cmd,sdslen(cmd));
sdsfree(cmd);

while (read_forever) {
cliReadSingleLineReply(fd,0);
}

retval = cliReadReply(fd);
if (retval) {
close(fd);
Expand Down

0 comments on commit 621d5c1

Please sign in to comment.