Skip to content
This repository has been archived by the owner on Feb 20, 2021. It is now read-only.

Commit

Permalink
Merge 2.4.9
Browse files Browse the repository at this point in the history
  • Loading branch information
HenryRawas committed Mar 21, 2012
2 parents fcd0061 + 9ca73f8 commit 19b1c6d
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 23 deletions.
2 changes: 1 addition & 1 deletion deps/hiredis/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#include <stdlib.h>
#include <string.h>
#ifndef _WIN32
#include <strings.h>
#include <strings.h>
#endif
#include <assert.h>
#include <ctype.h>
Expand Down
3 changes: 1 addition & 2 deletions deps/hiredis/hiredis.c
Original file line number Diff line number Diff line change
Expand Up @@ -1152,8 +1152,7 @@ int redisBufferRead(redisContext *c) {
* see if there is a reply available. */
int redisBufferReadDone(redisContext *c, char *buf, int nread) {
if (nread == 0) {
__redisSetError(c,REDIS_ERR_EOF,
sdsnew("Server closed the connection"));
__redisSetError(c,REDIS_ERR_EOF, sdsnew("Server closed the connection"));
return REDIS_ERR;
} else {
if (redisReaderFeed(c->reader,buf,nread) != REDIS_OK) {
Expand Down
77 changes: 63 additions & 14 deletions src/ae_wsiocp.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "ae.h"
#include "win32fixes.h"
#include "zmalloc.h"
#include "adlist.h"
#include "win32_wsiocp.h"
#include <mswsock.h>
#include <Guiddef.h>
Expand All @@ -46,23 +47,75 @@ typedef BOOL (WINAPI *sGetQueuedCompletionStatusEx)
BOOL fAlertable);
sGetQueuedCompletionStatusEx pGetQueuedCompletionStatusEx;

/* lookup structure for socket
* socket value is not an index. Convert socket to index
* and then find matching structure in list */

/* prefer prime number for number of indexes */
#define MAX_SOCKET_LOOKUP 1021

/* structure that keeps state of sockets and Completion port handle */
typedef struct aeApiState {
HANDLE iocp;
int setsize;
OVERLAPPED_ENTRY entries[MAX_COMPLETE_PER_POLL];
aeSockState *sockstate;
list lookup[MAX_SOCKET_LOOKUP];
} aeApiState;

/* convert socket value to an index
* Use simple modulo. We can add hash if needed */
int aeSocketIndex(int fd) {
return fd % MAX_SOCKET_LOOKUP;
}

/* utility to validate that socket / fd is being monitored */
/* get data for socket / fd being monitored */
aeSockState *aeGetSockState(void *apistate, int fd) {
int sindex;
listNode *node;
list *socklist;
aeSockState *sockState;
if (apistate == NULL) return NULL;
if (fd >= ((aeApiState *)apistate)->setsize) {
return NULL;

sindex = aeSocketIndex(fd);
socklist = &(((aeApiState *)apistate)->lookup[sindex]);
node = listFirst(socklist);
while (node != NULL) {
sockState = (aeSockState *)listNodeValue(node);
if (sockState->fd == fd) {
return sockState;
}
node = listNextNode(node);
}
// not found. Do lazy create of sockState.
sockState = (aeSockState *) zmalloc(sizeof(aeSockState));
if (sockState != NULL) {
sockState->fd = fd;
if (listAddNodeHead(socklist, sockState) != NULL) {
return sockState;
} else {
zfree(sockState);
}
}
return NULL;
}

/* get data for socket / fd being monitored */
void aeDelSockState(void *apistate, aeSockState *sockState) {
int sindex;
listNode *node;
list *socklist;
if (apistate == NULL) return;

sindex = aeSocketIndex(sockState->fd);
socklist = &(((aeApiState *)apistate)->lookup[sindex]);
node = listFirst(socklist);
while (node != NULL) {
if ((aeSockState *)listNodeValue(node) == sockState) {
listDelNode(socklist, node);
return;
}
node = listNextNode(node);
}
return &((aeApiState *)apistate)->sockstate[fd];
}

/* Called by ae to initialize state */
Expand All @@ -73,19 +126,12 @@ static int aeApiCreate(aeEventLoop *eventLoop) {
if (!state) return -1;
memset(state, 0, sizeof(aeApiState));

state->sockstate = (aeSockState *)zmalloc(sizeof(aeSockState) * AE_SETSIZE);
if (state->sockstate == NULL) {
zfree(state);
return -1;
}

/* create a single IOCP to be shared by all sockets */
state->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
NULL,
0,
1);
if (state->iocp == NULL) {
zfree(state->sockstate);
zfree(state);
return -1;
}
Expand All @@ -101,15 +147,14 @@ static int aeApiCreate(aeEventLoop *eventLoop) {
state->setsize = AE_SETSIZE;
eventLoop->apidata = state;
/* initialize the IOCP socket code with state reference */
aeWinInit(state, state->iocp, aeGetSockState);
aeWinInit(state, state->iocp, aeGetSockState, aeDelSockState);
return 0;
}

/* termination */
static void aeApiFree(aeEventLoop *eventLoop) {
aeApiState *state = (aeApiState *)eventLoop->apidata;
CloseHandle(state->iocp);
zfree(state->sockstate);
zfree(state);
aeWinCleanup();
}
Expand Down Expand Up @@ -244,6 +289,10 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
}
}
}
if (sockstate->wreqs == 0 && (sockstate->masks & (READ_QUEUED | SOCKET_ATTACHED)) == 0) {
// safe to delete sockstate
aeDelSockState(state, sockstate);
}
}
}
return numevents;
Expand Down
13 changes: 9 additions & 4 deletions src/win32_wsiocp.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
static void *iocpState;
static HANDLE iocph;
static fnGetSockState * aeGetSockState;
static fnDelSockState * aeDelSockState;

static LPFN_ACCEPTEX acceptex;
static LPFN_GETACCEPTEXSOCKADDRS getaddrs;
Expand Down Expand Up @@ -293,12 +294,10 @@ int aeWinSocketSend(int fd, char *buf, int len, int flags,
NULL);

if (SUCCEEDED_WITH_IOCP(result == 0)){
sockstate->masks |= WRITE_ACTIVE;
errno = WSA_IO_PENDING;
sockstate->wreqs++;
} else {
errno = WSAGetLastError();
sockstate->masks &= ~WRITE_ACTIVE;
zfree(areq);
}
return SOCKET_ERROR;
Expand Down Expand Up @@ -362,14 +361,20 @@ int aeWinSocketDetach(int fd, int shutd) {
int err = WSAGetLastError();
}
}
sockstate->masks = 0;
sockstate->masks &= ~(SOCKET_ATTACHED | AE_WRITABLE | AE_READABLE);
if (sockstate->wreqs == 0 && (sockstate->masks & READ_QUEUED) == 0) {
// safe to delete sockstate
aeDelSockState(iocpState, sockstate);
}
return 0;
}

void aeWinInit(void *state, HANDLE iocp, fnGetSockState *getSockState) {
void aeWinInit(void *state, HANDLE iocp, fnGetSockState *getSockState,
fnDelSockState *delSockState) {
iocpState = state;
iocph = iocp;
aeGetSockState = getSockState;
aeDelSockState = delSockState;
}

void aeWinCleanup() {
Expand Down
5 changes: 3 additions & 2 deletions src/win32_wsiocp.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,21 @@ typedef struct aacceptreq {
/* per socket information */
typedef struct aeSockState {
int masks;
int fd;
aacceptreq *reqs;
int wreqs;
OVERLAPPED ov_read;
} aeSockState;

typedef aeSockState * fnGetSockState(void *apistate, int fd);
typedef void fnDelSockState(void *apistate, aeSockState *sockState);

#define READ_QUEUED 0x000100
#define WRITE_ACTIVE 0x000200
#define SOCKET_ATTACHED 0x000400
#define ACCEPT_PENDING 0x000800
#define LISTEN_SOCK 0x001000

void aeWinInit(void *state, HANDLE iocp, fnGetSockState *getSockState);
void aeWinInit(void *state, HANDLE iocp, fnGetSockState *getSockState, fnDelSockState *delSockState);
void aeWinCleanup();

#endif
Expand Down

0 comments on commit 19b1c6d

Please sign in to comment.