Permalink
Browse files

windows sockets with IOCP

  • Loading branch information...
1 parent 734dabb commit ac1cdea778a7f7e9e2322e13290389fc4baabf27 @HenryRawas HenryRawas committed with Igor Zinkovsky Jan 27, 2012
Showing with 4,497 additions and 240 deletions.
  1. +20 −0 .gitignore
  2. +9 −2 deps/hiredis/async.c
  3. +65 −0 deps/hiredis/dict.c
  4. +10 −0 deps/hiredis/dict.h
  5. +83 −7 deps/hiredis/hiredis.c
  6. +20 −0 deps/hiredis/hiredis.h
  7. +179 −1 deps/hiredis/net.c
  8. +7 −2 deps/hiredis/sds.h
  9. +244 −2 deps/linenoise/linenoise.c
  10. +101 −0 msvs/RedisBenchmark/RedisBenchmark.vcxproj
  11. +82 −0 msvs/RedisCheckAof/RedisCheckAof.vcxproj
  12. +85 −0 msvs/RedisCheckDump/RedisCheckDump.vcxproj
  13. +95 −0 msvs/RedisCli/RedisCli.vcxproj
  14. +56 −0 msvs/RedisServer.sln
  15. +152 −0 msvs/RedisServer.vcxproj
  16. +100 −0 msvs/hiredis/hiredis.vcxproj
  17. +20 −7 src/ae.c
  18. +224 −0 src/ae_wsiocp.c
  19. +231 −2 src/anet.c
  20. +116 −4 src/aof.c
  21. +10 −3 src/bio.c
  22. +36 −22 src/config.c
  23. +2 −2 src/db.c
  24. +8 −2 src/debug.c
  25. +88 −1 src/dict.c
  26. +13 −0 src/dict.h
  27. +4 −0 src/fmacros.h
  28. +2 −2 src/intset.c
  29. +7 −7 src/lzf_c.c
  30. +1 −1 src/lzf_d.c
  31. +240 −5 src/networking.c
  32. +2 −2 src/object.c
  33. +10 −0 src/pqsort.c
  34. +1 −1 src/pubsub.c
  35. +40 −4 src/rdb.c
  36. +76 −6 src/redis-benchmark.c
  37. +45 −5 src/redis-check-aof.c
  38. +78 −16 src/redis-check-dump.c
  39. +41 −5 src/redis-cli.c
  40. +148 −11 src/redis.c
  41. +15 −1 src/redis.h
  42. +7 −0 src/release.c
  43. +181 −0 src/replication.c
  44. +20 −17 src/sds.c
  45. +4 −0 src/sds.h
  46. +7 −0 src/sha1.c
  47. +7 −7 src/sort.c
  48. +8 −1 src/syncio.c
  49. +5 −5 src/t_hash.c
  50. +22 −16 src/t_list.c
  51. +1 −1 src/t_set.c
  52. +21 −18 src/t_zset.c
  53. +24 −4 src/util.c
  54. +372 −0 src/win32_wsiocp.c
  55. +74 −0 src/win32_wsiocp.h
  56. +565 −0 src/win32fixes.c
  57. +288 −0 src/win32fixes.h
  58. +9 −9 src/ziplist.c
  59. +5 −2 src/zipmap.c
  60. +23 −0 src/zmalloc.c
  61. +3 −0 src/zmalloc.h
  62. +1 −1 tests/integration/aof.tcl
  63. +81 −35 tests/support/server.tcl
  64. +3 −1 tests/unit/protocol.tcl
View
@@ -17,3 +17,23 @@ release.h
src/transfer.sh
src/configs
src/redis-server.dSYM
+*.user
+*.exe
+*.sdf
+*.suo
+deps/pthreads-win32/
+msvs/Debug/
+msvs/Release/
+msvs/RedisBenchmark/Debug/
+msvs/RedisBenchmark/Release/
+msvs/RedisCheckAof/Debug/
+msvs/RedisCheckAof/Release/
+msvs/RedisCheckDump/Debug/
+msvs/RedisCheckDump/Release/
+msvs/RedisCli/Debug/
+msvs/RedisCli/Release/
+msvs/hiredis/Debug/
+msvs/hiredis/Release/
+msvs/ipch
+msvs/RedisServer.opensdf
+
View
@@ -30,14 +30,21 @@
*/
#include <string.h>
-#include <strings.h>
+#ifndef _WIN32
+ #include <strings.h>
+#endif
#include <assert.h>
#include <ctype.h>
#include "async.h"
#include "dict.c"
#include "sds.h"
#include "util.h"
+#ifdef _WIN32
+#define strcasecmp _stricmp
+#define strncasecmp _strnicmp
+#endif
+
/* Forward declaration of function in hiredis.c */
void __redisAppendCommand(redisContext *c, char *cmd, size_t len);
@@ -47,8 +54,8 @@ static unsigned int callbackHash(const void *key) {
}
static void *callbackValDup(void *privdata, const void *src) {
- ((void) privdata);
redisCallback *dup = malloc(sizeof(*dup));
+ ((void) privdata);
memcpy(dup,src,sizeof(*dup));
return dup;
}
View
@@ -42,7 +42,11 @@
/* -------------------------- private prototypes ---------------------------- */
static int _dictExpandIfNeeded(dict *ht);
+#ifdef _WIN32
+static size_t _dictNextPower(size_t size);
+#else
static unsigned long _dictNextPower(unsigned long size);
+#endif
static int _dictKeyIndex(dict *ht, const void *key);
static int _dictInit(dict *ht, dictType *type, void *privDataPtr);
@@ -85,6 +89,53 @@ static int _dictInit(dict *ht, dictType *type, void *privDataPtr) {
}
/* Expand or create the hashtable */
+#ifdef _WIN32
+static int dictExpand(dict *ht, size_t size) {
+ dict n; /* the new hashtable */
+ size_t realsize = _dictNextPower(size), i;
+
+ /* the size is invalid if it is smaller than the number of
+ * elements already inside the hashtable */
+ if (ht->used > size)
+ return DICT_ERR;
+
+ _dictInit(&n, ht->type, ht->privdata);
+ n.size = realsize;
+ n.sizemask = realsize-1;
+ n.table = calloc(realsize,sizeof(dictEntry*));
+
+ /* Copy all the elements from the old to the new table:
+ * note that if the old hash table is empty ht->size is zero,
+ * so dictExpand just creates an hash table. */
+ n.used = ht->used;
+ for (i = 0; i < ht->size && ht->used > 0; i++) {
+ dictEntry *he, *nextHe;
+
+ if (ht->table[i] == NULL) continue;
+
+ /* For each hash entry on this slot... */
+ he = ht->table[i];
+ while(he) {
+ unsigned int h;
+
+ nextHe = he->next;
+ /* Get the new element index */
+ h = dictHashKey(ht, he->key) & n.sizemask;
+ he->next = n.table[h];
+ n.table[h] = he;
+ ht->used--;
+ /* Pass to the next element */
+ he = nextHe;
+ }
+ }
+ assert(ht->used == 0);
+ free(ht->table);
+
+ /* Remap the new hashtable in the old */
+ *ht = n;
+ return DICT_OK;
+}
+#else
static int dictExpand(dict *ht, unsigned long size) {
dict n; /* the new hashtable */
unsigned long realsize = _dictNextPower(size), i;
@@ -130,6 +181,7 @@ static int dictExpand(dict *ht, unsigned long size) {
*ht = n;
return DICT_OK;
}
+#endif
/* Add an element to the target hash table */
static int dictAdd(dict *ht, void *key, void *val) {
@@ -303,6 +355,18 @@ static int _dictExpandIfNeeded(dict *ht) {
}
/* Our hash table capability is a power of two */
+#ifdef _WIN32
+static size_t _dictNextPower(size_t size) {
+ size_t i = DICT_HT_INITIAL_SIZE;
+
+ if (size >= LONG_MAX) return LONG_MAX;
+ while(1) {
+ if (i >= size)
+ return i;
+ i *= 2;
+ }
+}
+#else
static unsigned long _dictNextPower(unsigned long size) {
unsigned long i = DICT_HT_INITIAL_SIZE;
@@ -313,6 +377,7 @@ static unsigned long _dictNextPower(unsigned long size) {
i *= 2;
}
}
+#endif
/* Returns the index of a free slot that can be populated with
* an hash entry for the given 'key'.
View
@@ -60,9 +60,15 @@ typedef struct dictType {
typedef struct dict {
dictEntry **table;
dictType *type;
+#ifdef _WIN32
+ size_t size;
+ size_t sizemask;
+ size_t used;
+#else
unsigned long size;
unsigned long sizemask;
unsigned long used;
+#endif
void *privdata;
} dict;
@@ -113,7 +119,11 @@ typedef struct dictIterator {
/* API */
static unsigned int dictGenHashFunction(const unsigned char *buf, int len);
static dict *dictCreate(dictType *type, void *privDataPtr);
+#ifdef _WIN32
+static int dictExpand(dict *ht, size_t size);
+#else
static int dictExpand(dict *ht, unsigned long size);
+#endif
static int dictAdd(dict *ht, void *key, void *val);
static int dictReplace(dict *ht, void *key, void *val);
static int dictDelete(dict *ht, const void *key);
View
@@ -31,7 +31,9 @@
#include <string.h>
#include <stdlib.h>
-#include <unistd.h>
+#ifndef _WIN32
+ #include <unistd.h>
+#endif
#include <assert.h>
#include <errno.h>
#include <ctype.h>
@@ -296,7 +298,11 @@ static int processBulkItem(redisReader *r) {
redisReadTask *cur = &(r->rstack[r->ridx]);
void *obj = NULL;
char *p, *s;
+#ifdef _WIN32
+ long long len;
+#else
long len;
+#endif
unsigned long bytelen;
int success = 0;
@@ -316,10 +322,10 @@ static int processBulkItem(redisReader *r) {
success = 1;
} else {
/* Only continue when the buffer contains the entire bulk item. */
- bytelen += len+2; /* include \r\n */
+ bytelen += (unsigned long)len+2; /* include \r\n */
if (r->pos+bytelen <= r->len) {
if (r->fn && r->fn->createString)
- obj = r->fn->createString(cur,s+2,len);
+ obj = r->fn->createString(cur,s+2,(size_t)len);
else
obj = (void*)REDIS_REPLY_STRING;
success = 1;
@@ -343,7 +349,11 @@ static int processMultiBulkItem(redisReader *r) {
redisReadTask *cur = &(r->rstack[r->ridx]);
void *obj;
char *p;
+#ifdef _WIN32
+ long long elements;
+#else
long elements;
+#endif
int root = 0;
/* Set error for nested multi bulks with depth > 1 */
@@ -365,13 +375,13 @@ static int processMultiBulkItem(redisReader *r) {
moveToNextTask(r);
} else {
if (r->fn && r->fn->createArray)
- obj = r->fn->createArray(cur,elements);
+ obj = r->fn->createArray(cur,(int)elements);
else
obj = (void*)REDIS_REPLY_ARRAY;
/* Modify task stack when there are more than 0 elements. */
if (elements > 0) {
- cur->elements = elements;
+ cur->elements = (int)elements;
cur->obj = obj;
r->ridx++;
r->rstack[r->ridx].type = -1;
@@ -596,7 +606,7 @@ static int intlen(int i) {
/* Helper function for redisvFormatCommand(). */
static void addArgument(sds a, char ***argv, int *argc, int *totlen) {
(*argc)++;
- if ((*argv = realloc(*argv, sizeof(char*)*(*argc))) == NULL) redisOOM();
+ if ((*argv = (char **)realloc(*argv, sizeof(char*)*(*argc))) == NULL) redisOOM();
if (totlen) *totlen = *totlen+1+intlen(sdslen(a))+2+sdslen(a)+2;
(*argv)[(*argc)-1] = a;
}
@@ -697,7 +707,11 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) {
}
/* Consume and discard vararg */
+#ifdef _WIN32
+ va_arg(ap,void *);
+#else
va_arg(ap,void);
+#endif
}
}
touched = 1;
@@ -717,11 +731,15 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) {
totlen += 1+intlen(argc)+2;
/* Build the command at protocol level */
- cmd = malloc(totlen+1);
+ cmd = (char *)malloc(totlen+1);
if (!cmd) redisOOM();
pos = sprintf(cmd,"*%d\r\n",argc);
for (j = 0; j < argc; j++) {
+#ifdef _WIN32
+ pos += sprintf(cmd+pos,"$%llu\r\n",(unsigned long long)sdslen(argv[j]));
+#else
pos += sprintf(cmd+pos,"$%zu\r\n",sdslen(argv[j]));
+#endif
memcpy(cmd+pos,argv[j],sdslen(argv[j]));
pos += sdslen(argv[j]);
sdsfree(argv[j]);
@@ -780,7 +798,11 @@ int redisFormatCommandArgv(char **target, int argc, const char **argv, const siz
pos = sprintf(cmd,"*%d\r\n",argc);
for (j = 0; j < argc; j++) {
len = argvlen ? argvlen[j] : strlen(argv[j]);
+#ifdef _WIN32
+ pos += sprintf(cmd+pos,"$%llu\r\n",(unsigned long long)len);
+#else
pos += sprintf(cmd+pos,"$%zu\r\n",len);
+#endif
memcpy(cmd+pos,argv[j],len);
pos += len;
cmd[pos++] = '\r';
@@ -815,7 +837,11 @@ static redisContext *redisContextInit(void) {
void redisFree(redisContext *c) {
if (c->fd > 0)
+#ifdef _WIN32
+ closesocket(c->fd);
+#else
close(c->fd);
+#endif
if (c->errstr != NULL)
sdsfree(c->errstr);
if (c->obuf != NULL)
@@ -870,6 +896,21 @@ redisContext *redisConnectUnixNonBlock(const char *path) {
return c;
}
+/* initializers if caller handles connection */
+redisContext *redisConnected() {
+ redisContext *c = redisContextInit();
+ c->fd = -1;
+ c->flags |= REDIS_BLOCK;
+ return c;
+}
+
+redisContext *redisConnectedNonBlock() {
+ redisContext *c = redisContextInit();
+ c->fd = -1;
+ c->flags &= ~REDIS_BLOCK;
+ return c;
+}
+
/* Set read/write timeout on a blocking socket. */
int redisSetTimeout(redisContext *c, struct timeval tv) {
if (c->flags & REDIS_BLOCK)
@@ -902,7 +943,16 @@ static void __redisCreateReplyReader(redisContext *c) {
* see if there is a reply available. */
int redisBufferRead(redisContext *c) {
char buf[2048];
+#ifdef _WIN32
+ int nread = recv((SOCKET)c->fd,buf,sizeof(buf),0);
+ if (nread == -1) {
+ errno = WSAGetLastError();
+ if ((errno == ENOENT) || (errno == WSAEWOULDBLOCK))
+ errno = EAGAIN;
+ }
+#else
int nread = read(c->fd,buf,sizeof(buf));
+#endif
if (nread == -1) {
if (errno == EAGAIN && !(c->flags & REDIS_BLOCK)) {
/* Try again later */
@@ -921,6 +971,23 @@ int redisBufferRead(redisContext *c) {
return REDIS_OK;
}
+/* Use this function if the caller has already read the data. It will
+ * feed bytes to the reply parser.
+ *
+ * After this function is called, you may use redisContextReadReply to
+ * see if there is a reply available. */
+int redisBufferReadDone(redisContext *c, char *buf, int nread) {
+ if (nread == 0) {
+ __redisSetError(c,REDIS_ERR_EOF,
+ sdsnew("Server closed the connection"));
+ return REDIS_ERR;
+ } else {
+ __redisCreateReplyReader(c);
+ redisReplyReaderFeed(c->reader,buf,nread);
+ }
+ return REDIS_OK;
+}
+
/* Write the output buffer to the socket.
*
* Returns REDIS_OK when the buffer is empty, or (a part of) the buffer was
@@ -933,7 +1000,16 @@ int redisBufferRead(redisContext *c) {
int redisBufferWrite(redisContext *c, int *done) {
int nwritten;
if (sdslen(c->obuf) > 0) {
+#ifdef _WIN32
+ nwritten = send((SOCKET)c->fd,c->obuf,sdslen(c->obuf),0);
+ if (nwritten == -1) {
+ errno = WSAGetLastError();
+ if ((errno == ENOENT) || (errno == WSAEWOULDBLOCK))
+ errno = EAGAIN;
+ }
+#else
nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
+#endif
if (nwritten == -1) {
if (errno == EAGAIN && !(c->flags & REDIS_BLOCK)) {
/* Try again later */
Oops, something went wrong.

0 comments on commit ac1cdea

Please sign in to comment.