Skip to content
This repository has been archived by the owner on Feb 20, 2021. It is now read-only.

Commit

Permalink
Merge commit '994a7b5' into bksavecow
Browse files Browse the repository at this point in the history
  • Loading branch information
HenryRawas committed Jul 10, 2012
2 parents f6665eb + 994a7b5 commit 138eb25
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 30 deletions.
166 changes: 140 additions & 26 deletions src/ae_wsiocp.c
Expand Up @@ -57,6 +57,7 @@ typedef struct aeApiState {
int setsize;
OVERLAPPED_ENTRY entries[MAX_COMPLETE_PER_POLL];
list lookup[MAX_SOCKET_LOOKUP];
list closing;
} aeApiState;

/* convert socket value to an index
Expand All @@ -65,7 +66,7 @@ int aeSocketIndex(int fd) {
return fd % MAX_SOCKET_LOOKUP;
}

/* get data for socket / fd being monitored */
/* get data for socket / fd being monitored. Create if not found*/
aeSockState *aeGetSockState(void *apistate, int fd) {
int sindex;
listNode *node;
Expand All @@ -87,6 +88,11 @@ aeSockState *aeGetSockState(void *apistate, int fd) {
sockState = (aeSockState *) zmalloc(sizeof(aeSockState));
if (sockState != NULL) {
sockState->fd = fd;
sockState->masks = 0;
sockState->wreqs = 0;
sockState->reqs = NULL;
memset(&sockState->wreqlist, 0, sizeof(sockState->wreqlist));

if (listAddNodeHead(socklist, sockState) != NULL) {
return sockState;
} else {
Expand All @@ -97,22 +103,72 @@ aeSockState *aeGetSockState(void *apistate, int fd) {
}

/* get data for socket / fd being monitored */
void aeDelSockState(void *apistate, aeSockState *sockState) {
aeSockState *aeGetExistingSockState(void *apistate, int fd) {
int sindex;
listNode *node;
list *socklist;
if (apistate == NULL) return;
aeSockState *sockState;
if (apistate == NULL) return NULL;

sindex = aeSocketIndex(sockState->fd);
sindex = aeSocketIndex(fd);
socklist = &(((aeApiState *)apistate)->lookup[sindex]);
node = listFirst(socklist);
while (node != NULL) {
if ((aeSockState *)listNodeValue(node) == sockState) {
sockState = (aeSockState *)listNodeValue(node);
if (sockState->fd == fd) {
return sockState;
}
node = listNextNode(node);
}

return NULL;
}

// find matching value in list and remove. If found return 1
int removeMatchFromList(list *socklist, void *value) {
listNode *node;
node = listFirst(socklist);
while (node != NULL) {
if (listNodeValue(node) == value) {
listDelNode(socklist, node);
return;
return 1;
}
node = listNextNode(node);
}
return 0;
}

/* delete data for socket / fd being monitored */
void aeDelSockState(void *apistate, aeSockState *sockState) {
int sindex;
list *socklist;

if (apistate == NULL) return;

if (sockState->wreqs == 0 && (sockState->masks & (READ_QUEUED | SOCKET_ATTACHED)) == 0) {
// see if in active list
sindex = aeSocketIndex(sockState->fd);
socklist = &(((aeApiState *)apistate)->lookup[sindex]);
if (removeMatchFromList(socklist, sockState) == 1) {
zfree(sockState);
return;
}
// try closing list
socklist = &(((aeApiState *)apistate)->closing);
if (removeMatchFromList(socklist, sockState) == 1) {
zfree(sockState);
return;
}
} else {
// not safe to delete. Move to closing
sindex = aeSocketIndex(sockState->fd);
socklist = &(((aeApiState *)apistate)->lookup[sindex]);
if (removeMatchFromList(socklist, sockState) == 1) {
// removed from active list. add to closing list
socklist = &(((aeApiState *)apistate)->closing);
listAddNodeHead(socklist, sockState);
}
}
}

/* Called by ae to initialize state */
Expand Down Expand Up @@ -191,6 +247,7 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
return -1;
}
sockstate->wreqs++;
listAddNodeTail(&sockstate->wreqlist, areq);
}
}
return 0;
Expand Down Expand Up @@ -235,18 +292,39 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
&state->entries[0].lpCompletionKey,
&state->entries[0].lpOverlapped,
mswait);
numComplete = 1;
if (!rc && state->entries[0].lpOverlapped == NULL) {
// timeout. Return.
return 0;
} else {
// check if more completions are ready
int lrc = 1;
rc = 1;
numComplete = 1;

while (numComplete < MAX_COMPLETE_PER_POLL) {
lrc = GetQueuedCompletionStatus(state->iocp,
&state->entries[numComplete].dwNumberOfBytesTransferred,
&state->entries[numComplete].lpCompletionKey,
&state->entries[numComplete].lpOverlapped,
0);
if (lrc) {
numComplete++;
} else {
if (state->entries[numComplete].lpOverlapped == NULL) break;
}
}
}
}

if (rc && numComplete > 0) {
LPOVERLAPPED_ENTRY entry = state->entries;
for (j = 0; j < numComplete && numevents < AE_SETSIZE; j++, entry++) {
/* the competion key is the socket */
SOCKET sock = (SOCKET)entry->lpCompletionKey;
sockstate = aeGetSockState(state, sock);
if (sockstate == NULL) continue;
sockstate = aeGetExistingSockState(state, sock);
if (sockstate == NULL) continue;

if (sockstate->masks & LISTEN_SOCK) {
if ((sockstate->masks & LISTEN_SOCK) && entry->lpOverlapped != NULL) {
/* need to set event for listening */
aacceptreq *areq = (aacceptreq *)entry->lpOverlapped;
areq->next = sockstate->reqs;
Expand All @@ -259,34 +337,70 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
}
} else {
/* check if event is read complete (may be 0 length read) */
int matched = 0;
if (entry->lpOverlapped == &sockstate->ov_read) {
matched = 1;
sockstate->masks &= ~READ_QUEUED;
if (sockstate->masks & AE_READABLE) {
if (sockstate->masks & AE_READABLE) {
eventLoop->fired[numevents].fd = sock;
eventLoop->fired[numevents].mask = AE_READABLE;
numevents++;
}
} else if (sockstate->wreqs > 0) {
} else if (sockstate->wreqs > 0 && entry->lpOverlapped != NULL) {
/* should be write complete. Get results */
asendreq *areq = (asendreq *)entry->lpOverlapped;
/* call write complete callback so buffers can be freed */
if (areq->proc != NULL) {
DWORD written = 0;
DWORD flags;
WSAGetOverlappedResult(sock, &areq->ov, &written, FALSE, &flags);
areq->proc(areq->eventLoop, sock, &areq->req, (int)written);
matched = removeMatchFromList(&sockstate->wreqlist, areq);
if (matched) {
/* call write complete callback so buffers can be freed */
if (areq->proc != NULL) {
DWORD written = 0;
DWORD flags;
WSAGetOverlappedResult(sock, &areq->ov, &written, FALSE, &flags);
areq->proc(areq->eventLoop, sock, &areq->req, (int)written);
}
sockstate->wreqs--;
zfree(areq);
/* if no active write requests, set ready to write */
if (sockstate->wreqs == 0 && sockstate->masks & AE_WRITABLE) {
eventLoop->fired[numevents].fd = sock;
eventLoop->fired[numevents].mask = AE_WRITABLE;
numevents++;
}
}
sockstate->wreqs--;
zfree(areq);
/* if no active write requests, set ready to write */
if (sockstate->wreqs == 0 && sockstate->masks & AE_WRITABLE) {
eventLoop->fired[numevents].fd = sock;
eventLoop->fired[numevents].mask = AE_WRITABLE;
numevents++;
}

if (matched == 0) {
// no match for active connection. Try the closing list.
list *socklist = &(state->closing);
listNode *node;
node = listFirst(socklist);
while (node != NULL && matched == 0) {
sockstate = (aeSockState *)listNodeValue(node);
if (sockstate->fd == sock) {
if (entry->lpOverlapped == &sockstate->ov_read) {
// read complete
sockstate->masks &= ~READ_QUEUED;
matched = 1;
} else {
// check pending writes
asendreq *areq = (asendreq *)entry->lpOverlapped;
matched = removeMatchFromList(&sockstate->wreqlist, areq);
if (matched) {
sockstate->wreqs--;
zfree(areq);
}
}
}
node = listNextNode(node);
}
}

if (matched == 0) {
sockstate = NULL;
}
}
if (sockstate->wreqs == 0 && (sockstate->masks & (READ_QUEUED | SOCKET_ATTACHED)) == 0) {
if (sockstate != NULL && sockstate->wreqs == 0 &&
(sockstate->masks & (READ_QUEUED | SOCKET_ATTACHED)) == 0) {
// safe to delete sockstate
aeDelSockState(state, sockstate);
}
Expand Down
8 changes: 4 additions & 4 deletions src/win32_wsiocp.c
Expand Up @@ -21,6 +21,7 @@
*/

#include "ae.h"
#include "adlist.h"
#include "win32fixes.h"
#include "zmalloc.h"
#include <mswsock.h>
Expand Down Expand Up @@ -199,6 +200,7 @@ int aeWinAccept(int fd, struct sockaddr *sa, socklen_t *len) {

aeWinSocketAttach(acceptsock);

zfree(areq->buf);
zfree(areq);

/* queue another accept */
Expand Down Expand Up @@ -293,6 +295,7 @@ int aeWinSocketSend(int fd, char *buf, int len, int flags,
if (SUCCEEDED_WITH_IOCP(result == 0)){
errno = WSA_IO_PENDING;
sockstate->wreqs++;
listAddNodeTail(&sockstate->wreqlist, areq);
} else {
errno = WSAGetLastError();
zfree(areq);
Expand Down Expand Up @@ -359,10 +362,7 @@ int aeWinSocketDetach(int fd, int shutd) {
}
}
sockstate->masks &= ~(SOCKET_ATTACHED | AE_WRITABLE | AE_READABLE);
if (sockstate->wreqs == 0 && (sockstate->masks & READ_QUEUED) == 0) {
// safe to delete sockstate
aeDelSockState(iocpState, sockstate);
}
aeDelSockState(iocpState, sockstate);
return 0;
}

Expand Down
1 change: 1 addition & 0 deletions src/win32_wsiocp.h
Expand Up @@ -55,6 +55,7 @@ typedef struct aeSockState {
aacceptreq *reqs;
int wreqs;
OVERLAPPED ov_read;
list wreqlist;
} aeSockState;

typedef aeSockState * fnGetSockState(void *apistate, int fd);
Expand Down

0 comments on commit 138eb25

Please sign in to comment.