Skip to content
This repository has been archived by the owner on Feb 20, 2021. It is now read-only.

Commit

Permalink
Merge commit '994a7b5' into bksavecow
Browse files Browse the repository at this point in the history
  • Loading branch information
HenryRawas committed Jul 10, 2012
2 parents f6665eb + 994a7b5 commit 138eb25
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 30 deletions.
166 changes: 140 additions & 26 deletions src/ae_wsiocp.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ typedef struct aeApiState {
int setsize; int setsize;
OVERLAPPED_ENTRY entries[MAX_COMPLETE_PER_POLL]; OVERLAPPED_ENTRY entries[MAX_COMPLETE_PER_POLL];
list lookup[MAX_SOCKET_LOOKUP]; list lookup[MAX_SOCKET_LOOKUP];
list closing;
} aeApiState; } aeApiState;


/* convert socket value to an index /* convert socket value to an index
Expand All @@ -65,7 +66,7 @@ int aeSocketIndex(int fd) {
return fd % MAX_SOCKET_LOOKUP; return fd % MAX_SOCKET_LOOKUP;
} }


/* get data for socket / fd being monitored */ /* get data for socket / fd being monitored. Create if not found*/
aeSockState *aeGetSockState(void *apistate, int fd) { aeSockState *aeGetSockState(void *apistate, int fd) {
int sindex; int sindex;
listNode *node; listNode *node;
Expand All @@ -87,6 +88,11 @@ aeSockState *aeGetSockState(void *apistate, int fd) {
sockState = (aeSockState *) zmalloc(sizeof(aeSockState)); sockState = (aeSockState *) zmalloc(sizeof(aeSockState));
if (sockState != NULL) { if (sockState != NULL) {
sockState->fd = fd; sockState->fd = fd;
sockState->masks = 0;
sockState->wreqs = 0;
sockState->reqs = NULL;
memset(&sockState->wreqlist, 0, sizeof(sockState->wreqlist));

if (listAddNodeHead(socklist, sockState) != NULL) { if (listAddNodeHead(socklist, sockState) != NULL) {
return sockState; return sockState;
} else { } else {
Expand All @@ -97,22 +103,72 @@ aeSockState *aeGetSockState(void *apistate, int fd) {
} }


/* get data for socket / fd being monitored */ /* get data for socket / fd being monitored */
void aeDelSockState(void *apistate, aeSockState *sockState) { aeSockState *aeGetExistingSockState(void *apistate, int fd) {
int sindex; int sindex;
listNode *node; listNode *node;
list *socklist; list *socklist;
if (apistate == NULL) return; aeSockState *sockState;
if (apistate == NULL) return NULL;


sindex = aeSocketIndex(sockState->fd); sindex = aeSocketIndex(fd);
socklist = &(((aeApiState *)apistate)->lookup[sindex]); socklist = &(((aeApiState *)apistate)->lookup[sindex]);
node = listFirst(socklist); node = listFirst(socklist);
while (node != NULL) { while (node != NULL) {
if ((aeSockState *)listNodeValue(node) == sockState) { sockState = (aeSockState *)listNodeValue(node);
if (sockState->fd == fd) {
return sockState;
}
node = listNextNode(node);
}

return NULL;
}

// find matching value in list and remove. If found return 1
int removeMatchFromList(list *socklist, void *value) {
listNode *node;
node = listFirst(socklist);
while (node != NULL) {
if (listNodeValue(node) == value) {
listDelNode(socklist, node); listDelNode(socklist, node);
return; return 1;
} }
node = listNextNode(node); node = listNextNode(node);
} }
return 0;
}

/* delete data for socket / fd being monitored */
void aeDelSockState(void *apistate, aeSockState *sockState) {
int sindex;
list *socklist;

if (apistate == NULL) return;

if (sockState->wreqs == 0 && (sockState->masks & (READ_QUEUED | SOCKET_ATTACHED)) == 0) {
// see if in active list
sindex = aeSocketIndex(sockState->fd);
socklist = &(((aeApiState *)apistate)->lookup[sindex]);
if (removeMatchFromList(socklist, sockState) == 1) {
zfree(sockState);
return;
}
// try closing list
socklist = &(((aeApiState *)apistate)->closing);
if (removeMatchFromList(socklist, sockState) == 1) {
zfree(sockState);
return;
}
} else {
// not safe to delete. Move to closing
sindex = aeSocketIndex(sockState->fd);
socklist = &(((aeApiState *)apistate)->lookup[sindex]);
if (removeMatchFromList(socklist, sockState) == 1) {
// removed from active list. add to closing list
socklist = &(((aeApiState *)apistate)->closing);
listAddNodeHead(socklist, sockState);
}
}
} }


/* Called by ae to initialize state */ /* Called by ae to initialize state */
Expand Down Expand Up @@ -191,6 +247,7 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
return -1; return -1;
} }
sockstate->wreqs++; sockstate->wreqs++;
listAddNodeTail(&sockstate->wreqlist, areq);
} }
} }
return 0; return 0;
Expand Down Expand Up @@ -235,18 +292,39 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
&state->entries[0].lpCompletionKey, &state->entries[0].lpCompletionKey,
&state->entries[0].lpOverlapped, &state->entries[0].lpOverlapped,
mswait); mswait);
numComplete = 1; if (!rc && state->entries[0].lpOverlapped == NULL) {
// timeout. Return.
return 0;
} else {
// check if more completions are ready
int lrc = 1;
rc = 1;
numComplete = 1;

while (numComplete < MAX_COMPLETE_PER_POLL) {
lrc = GetQueuedCompletionStatus(state->iocp,
&state->entries[numComplete].dwNumberOfBytesTransferred,
&state->entries[numComplete].lpCompletionKey,
&state->entries[numComplete].lpOverlapped,
0);
if (lrc) {
numComplete++;
} else {
if (state->entries[numComplete].lpOverlapped == NULL) break;
}
}
}
} }


if (rc && numComplete > 0) { if (rc && numComplete > 0) {
LPOVERLAPPED_ENTRY entry = state->entries; LPOVERLAPPED_ENTRY entry = state->entries;
for (j = 0; j < numComplete && numevents < AE_SETSIZE; j++, entry++) { for (j = 0; j < numComplete && numevents < AE_SETSIZE; j++, entry++) {
/* the competion key is the socket */ /* the competion key is the socket */
SOCKET sock = (SOCKET)entry->lpCompletionKey; SOCKET sock = (SOCKET)entry->lpCompletionKey;
sockstate = aeGetSockState(state, sock); sockstate = aeGetExistingSockState(state, sock);
if (sockstate == NULL) continue; if (sockstate == NULL) continue;


if (sockstate->masks & LISTEN_SOCK) { if ((sockstate->masks & LISTEN_SOCK) && entry->lpOverlapped != NULL) {
/* need to set event for listening */ /* need to set event for listening */
aacceptreq *areq = (aacceptreq *)entry->lpOverlapped; aacceptreq *areq = (aacceptreq *)entry->lpOverlapped;
areq->next = sockstate->reqs; areq->next = sockstate->reqs;
Expand All @@ -259,34 +337,70 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
} }
} else { } else {
/* check if event is read complete (may be 0 length read) */ /* check if event is read complete (may be 0 length read) */
int matched = 0;
if (entry->lpOverlapped == &sockstate->ov_read) { if (entry->lpOverlapped == &sockstate->ov_read) {
matched = 1;
sockstate->masks &= ~READ_QUEUED; sockstate->masks &= ~READ_QUEUED;
if (sockstate->masks & AE_READABLE) { if (sockstate->masks & AE_READABLE) {
eventLoop->fired[numevents].fd = sock; eventLoop->fired[numevents].fd = sock;
eventLoop->fired[numevents].mask = AE_READABLE; eventLoop->fired[numevents].mask = AE_READABLE;
numevents++; numevents++;
} }
} else if (sockstate->wreqs > 0) { } else if (sockstate->wreqs > 0 && entry->lpOverlapped != NULL) {
/* should be write complete. Get results */ /* should be write complete. Get results */
asendreq *areq = (asendreq *)entry->lpOverlapped; asendreq *areq = (asendreq *)entry->lpOverlapped;
/* call write complete callback so buffers can be freed */ matched = removeMatchFromList(&sockstate->wreqlist, areq);
if (areq->proc != NULL) { if (matched) {
DWORD written = 0; /* call write complete callback so buffers can be freed */
DWORD flags; if (areq->proc != NULL) {
WSAGetOverlappedResult(sock, &areq->ov, &written, FALSE, &flags); DWORD written = 0;
areq->proc(areq->eventLoop, sock, &areq->req, (int)written); DWORD flags;
WSAGetOverlappedResult(sock, &areq->ov, &written, FALSE, &flags);
areq->proc(areq->eventLoop, sock, &areq->req, (int)written);
}
sockstate->wreqs--;
zfree(areq);
/* if no active write requests, set ready to write */
if (sockstate->wreqs == 0 && sockstate->masks & AE_WRITABLE) {
eventLoop->fired[numevents].fd = sock;
eventLoop->fired[numevents].mask = AE_WRITABLE;
numevents++;
}
} }
sockstate->wreqs--; }
zfree(areq);
/* if no active write requests, set ready to write */ if (matched == 0) {
if (sockstate->wreqs == 0 && sockstate->masks & AE_WRITABLE) { // no match for active connection. Try the closing list.
eventLoop->fired[numevents].fd = sock; list *socklist = &(state->closing);
eventLoop->fired[numevents].mask = AE_WRITABLE; listNode *node;
numevents++; node = listFirst(socklist);
while (node != NULL && matched == 0) {
sockstate = (aeSockState *)listNodeValue(node);
if (sockstate->fd == sock) {
if (entry->lpOverlapped == &sockstate->ov_read) {
// read complete
sockstate->masks &= ~READ_QUEUED;
matched = 1;
} else {
// check pending writes
asendreq *areq = (asendreq *)entry->lpOverlapped;
matched = removeMatchFromList(&sockstate->wreqlist, areq);
if (matched) {
sockstate->wreqs--;
zfree(areq);
}
}
}
node = listNextNode(node);
} }
} }

if (matched == 0) {
sockstate = NULL;
}
} }
if (sockstate->wreqs == 0 && (sockstate->masks & (READ_QUEUED | SOCKET_ATTACHED)) == 0) { if (sockstate != NULL && sockstate->wreqs == 0 &&
(sockstate->masks & (READ_QUEUED | SOCKET_ATTACHED)) == 0) {
// safe to delete sockstate // safe to delete sockstate
aeDelSockState(state, sockstate); aeDelSockState(state, sockstate);
} }
Expand Down
8 changes: 4 additions & 4 deletions src/win32_wsiocp.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/ */


#include "ae.h" #include "ae.h"
#include "adlist.h"
#include "win32fixes.h" #include "win32fixes.h"
#include "zmalloc.h" #include "zmalloc.h"
#include <mswsock.h> #include <mswsock.h>
Expand Down Expand Up @@ -199,6 +200,7 @@ int aeWinAccept(int fd, struct sockaddr *sa, socklen_t *len) {


aeWinSocketAttach(acceptsock); aeWinSocketAttach(acceptsock);


zfree(areq->buf);
zfree(areq); zfree(areq);


/* queue another accept */ /* queue another accept */
Expand Down Expand Up @@ -293,6 +295,7 @@ int aeWinSocketSend(int fd, char *buf, int len, int flags,
if (SUCCEEDED_WITH_IOCP(result == 0)){ if (SUCCEEDED_WITH_IOCP(result == 0)){
errno = WSA_IO_PENDING; errno = WSA_IO_PENDING;
sockstate->wreqs++; sockstate->wreqs++;
listAddNodeTail(&sockstate->wreqlist, areq);
} else { } else {
errno = WSAGetLastError(); errno = WSAGetLastError();
zfree(areq); zfree(areq);
Expand Down Expand Up @@ -359,10 +362,7 @@ int aeWinSocketDetach(int fd, int shutd) {
} }
} }
sockstate->masks &= ~(SOCKET_ATTACHED | AE_WRITABLE | AE_READABLE); sockstate->masks &= ~(SOCKET_ATTACHED | AE_WRITABLE | AE_READABLE);
if (sockstate->wreqs == 0 && (sockstate->masks & READ_QUEUED) == 0) { aeDelSockState(iocpState, sockstate);
// safe to delete sockstate
aeDelSockState(iocpState, sockstate);
}
return 0; return 0;
} }


Expand Down
1 change: 1 addition & 0 deletions src/win32_wsiocp.h
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ typedef struct aeSockState {
aacceptreq *reqs; aacceptreq *reqs;
int wreqs; int wreqs;
OVERLAPPED ov_read; OVERLAPPED ov_read;
list wreqlist;
} aeSockState; } aeSockState;


typedef aeSockState * fnGetSockState(void *apistate, int fd); typedef aeSockState * fnGetSockState(void *apistate, int fd);
Expand Down

0 comments on commit 138eb25

Please sign in to comment.