Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Copy on write for background save

  • Loading branch information...
commit 29da2a6bcbc8ca2464cb5c2ff03dcccc8f386dc7 1 parent 19f5071
@HenryRawas HenryRawas authored
View
10 msvs/RedisServer.sln
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 11.00
# Visual Studio 2010
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "RedisServer", "RedisServer.vcxproj", "{4A8559D8-D9F9-BC1B-07E5-9B20A8E39BE8}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "RedisServer", "RedisServer.vcxproj", "{07369961-4B20-6DE7-6FFC-973CAEBC7A28}"
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "hiredis", "hiredis\hiredis.vcxproj", "{13E85053-54B3-487B-8DDB-3430B1C1B3BF}"
EndProject
@@ -25,10 +25,10 @@ Global
Release|Win32 = Release|Win32
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
- {4A8559D8-D9F9-BC1B-07E5-9B20A8E39BE8}.Debug|Win32.ActiveCfg = Debug|Win32
- {4A8559D8-D9F9-BC1B-07E5-9B20A8E39BE8}.Debug|Win32.Build.0 = Debug|Win32
- {4A8559D8-D9F9-BC1B-07E5-9B20A8E39BE8}.Release|Win32.ActiveCfg = Release|Win32
- {4A8559D8-D9F9-BC1B-07E5-9B20A8E39BE8}.Release|Win32.Build.0 = Release|Win32
+ {07369961-4B20-6DE7-6FFC-973CAEBC7A28}.Debug|Win32.ActiveCfg = Debug|Win32
+ {07369961-4B20-6DE7-6FFC-973CAEBC7A28}.Debug|Win32.Build.0 = Debug|Win32
+ {07369961-4B20-6DE7-6FFC-973CAEBC7A28}.Release|Win32.ActiveCfg = Release|Win32
+ {07369961-4B20-6DE7-6FFC-973CAEBC7A28}.Release|Win32.Build.0 = Release|Win32
{13E85053-54B3-487B-8DDB-3430B1C1B3BF}.Debug|Win32.ActiveCfg = Debug|Win32
{13E85053-54B3-487B-8DDB-3430B1C1B3BF}.Debug|Win32.Build.0 = Debug|Win32
{13E85053-54B3-487B-8DDB-3430B1C1B3BF}.Release|Win32.ActiveCfg = Release|Win32
View
4 msvs/RedisServer.vcxproj
@@ -113,6 +113,8 @@
<ClCompile Include="..\src\t_zset.c" />
<ClCompile Include="..\src\util.c" />
<ClCompile Include="..\src\win32fixes.c" />
+ <ClCompile Include="..\src\win32_bksv.c" />
+ <ClCompile Include="..\src\win32_cow.c" />
<ClCompile Include="..\src\win32_wsiocp.c" />
<ClCompile Include="..\src\ziplist.c" />
<ClCompile Include="..\src\zipmap.c" />
@@ -142,6 +144,8 @@
<ClInclude Include="..\src\util.h" />
<ClInclude Include="..\src\version.h" />
<ClInclude Include="..\src\win32fixes.h" />
+ <ClInclude Include="..\src\win32_bksv.h" />
+ <ClInclude Include="..\src\win32_cow.h" />
<ClInclude Include="..\src\win32_wsiocp.h" />
<ClInclude Include="..\src\ziplist.h" />
<ClInclude Include="..\src\zipmap.h" />
View
218 src/aof.c
@@ -34,8 +34,7 @@ void stopAppendOnly(void) {
/* rewrite operation in progress? kill it, wait child exit */
if (server.bgrewritechildpid != -1) {
#ifdef _WIN32
- /* Windows placeholder for killing whatever lounched instead of fork() */
- w32CeaseAndDesist(server.bgsavechildpid);
+ bkgdsave_termthread();
#else
int statloc;
@@ -417,7 +416,7 @@ int loadAppendOnlyFile(char *filename) {
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
int rewriteAppendOnlyFile(char *filename) {
- dictIterator *di = NULL;
+ roDictIter *di = NULL;
dictEntry *de;
FILE *fp;
char tmpfile[256];
@@ -439,9 +438,26 @@ int rewriteAppendOnlyFile(char *filename) {
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
- dict *d = db->dict;
+ dict *d;
+
+#ifdef _WIN32
+ cowLock();
+ if (server.isBackgroundSaving == 1) {
+ /* use background DB copy */
+ db = server.cowSaveDb+j;
+ }
+ d = db->dict;
+ if (dictSize(d) == 0) {
+ cowUnlock();
+ continue;
+ }
+ di = roDBGetIterator(j);
+ cowUnlock();
+#else
+ d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
+#endif
if (!di) {
fclose(fp);
return REDIS_ERR;
@@ -452,7 +468,7 @@ int rewriteAppendOnlyFile(char *filename) {
if (fwriteBulkLongLong(fp,j) == 0) goto werr;
/* Iterate this DB writing every entry */
- while((de = dictNext(di)) != NULL) {
+ while((de = roDictNext(di)) != NULL) {
sds keystr = dictGetEntryKey(de);
robj key, *o;
time_t expiretime;
@@ -462,10 +478,20 @@ int rewriteAppendOnlyFile(char *filename) {
initStaticStringObject(key,keystr);
expiretime = getExpire(db,&key);
+ cowLock();
+#ifdef _WIN32
+ if (o->type == REDIS_LIST ||
+ o->type == REDIS_SET ||
+ o->type == REDIS_ZSET ||
+ o->type == REDIS_HASH) {
+ o = (robj *)getRoConvertedObj(keystr, o);
+ }
+#endif
/* Save the key and associated value */
if (o->type == REDIS_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
+ cowUnlock();
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
/* Key and value */
if (fwriteBulkObject(fp,&key) == 0) goto werr;
@@ -480,6 +506,7 @@ int rewriteAppendOnlyFile(char *filename) {
unsigned int vlen;
long long vlong;
+ cowUnlock();
while(ziplistGet(p,&vstr,&vlen,&vlong)) {
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
@@ -495,17 +522,36 @@ int rewriteAppendOnlyFile(char *filename) {
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
list *list = o->ptr;
listNode *ln;
- listIter li;
+ roListIter li;
- listRewind(list,&li);
- while((ln = listNext(&li))) {
+ roListRewind(list, NULL, &li);
+ cowUnlock();
+ while((ln = roListNext(&li))) {
robj *eleobj = listNodeValue(ln);
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
}
+#ifdef _WIN32
+ } else if (o->encoding == REDIS_ENCODING_LINKEDLISTARRAY) {
+ cowListArray *ar;
+ roListIter li;
+ listNode *ln;
+ cowUnlock();
+
+ ar = (cowListArray *)o->ptr;
+ roListRewind(NULL, ar, &li);
+ while((ln = roListNext(&li))) {
+ robj *eleobj = listNodeValue(ln);
+
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,&key) == 0) goto werr;
+ if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+ }
+#endif
} else {
+ cowUnlock();
redisPanic("Unknown list encoding");
}
} else if (o->type == REDIS_SET) {
@@ -515,22 +561,43 @@ int rewriteAppendOnlyFile(char *filename) {
if (o->encoding == REDIS_ENCODING_INTSET) {
int ii = 0;
int64_t llval;
+ cowUnlock();
while(intsetGet(o->ptr,ii++,&llval)) {
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkLongLong(fp,llval) == 0) goto werr;
}
} else if (o->encoding == REDIS_ENCODING_HT) {
- dictIterator *di = dictGetIterator(o->ptr);
+ roDictIter *di;
dictEntry *de;
- while((de = dictNext(di)) != NULL) {
+ di = roDictGetIterator(o->ptr, NULL);
+ cowUnlock();
+ while((de = roDictNext(di)) != NULL) {
robj *eleobj = dictGetEntryKey(de);
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
}
- dictReleaseIterator(di);
+ roDictReleaseIterator(di);
+#ifdef _WIN32
+ } else if (o->encoding == REDIS_ENCODING_HTARRAY) {
+ dictEntry *de;
+ cowDictArray *ar;
+ roDictIter *di;
+ cowUnlock();
+
+ ar = (cowDictArray *)o->ptr;
+ di = roDictGetIterator(NULL, ar);
+ while((de = roDictNext(di)) != NULL) {
+ robj *eleobj = dictGetEntryKey(de);
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,&key) == 0) goto werr;
+ if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+ }
+ roDictReleaseIterator(di);
+#endif
} else {
+ cowUnlock();
redisPanic("Unknown set encoding");
}
} else if (o->type == REDIS_ZSET) {
@@ -545,6 +612,7 @@ int rewriteAppendOnlyFile(char *filename) {
long long vll;
double score;
+ cowUnlock();
eptr = ziplistIndex(zl,0);
redisAssert(eptr != NULL);
sptr = ziplistNext(zl,eptr);
@@ -568,10 +636,30 @@ int rewriteAppendOnlyFile(char *filename) {
}
} else if (o->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
- dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;
+ roZDictIter *di = roZDictGetIterator(zs->dict, NULL);
+ cowUnlock();
+
+ while((de = roZDictNext(di)) != NULL) {
+ robj *eleobj = dictGetEntryKey(de);
+ double *score = dictGetEntryVal(de);
+
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,&key) == 0) goto werr;
+ if (fwriteBulkDouble(fp,*score) == 0) goto werr;
+ if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+ }
+ roZDictReleaseIterator(di);
+#ifdef _WIN32
+ } else if (o->encoding == REDIS_ENCODING_HTZARRAY) {
+ dictEntry *de;
+ cowDictZArray *ar;
+ roZDictIter *di;
+ cowUnlock();
- while((de = dictNext(di)) != NULL) {
+ ar = (cowDictZArray *)o->ptr;
+ di = roZDictGetIterator(NULL, ar);
+ while((de = roZDictNext(di)) != NULL) {
robj *eleobj = dictGetEntryKey(de);
double *score = dictGetEntryVal(de);
@@ -580,8 +668,10 @@ int rewriteAppendOnlyFile(char *filename) {
if (fwriteBulkDouble(fp,*score) == 0) goto werr;
if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
}
- dictReleaseIterator(di);
+ roZDictReleaseIterator(di);
+#endif
} else {
+ cowUnlock();
redisPanic("Unknown sorted set encoding");
}
} else if (o->type == REDIS_HASH) {
@@ -592,6 +682,7 @@ int rewriteAppendOnlyFile(char *filename) {
unsigned char *p = zipmapRewind(o->ptr);
unsigned char *field, *val;
unsigned int flen, vlen;
+ cowUnlock();
while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
@@ -601,11 +692,12 @@ int rewriteAppendOnlyFile(char *filename) {
if (fwriteBulkString(fp,(char*)val,vlen) == 0)
goto werr;
}
- } else {
- dictIterator *di = dictGetIterator(o->ptr);
+ } else if (o->encoding == REDIS_ENCODING_HT) {
dictEntry *de;
+ roDictIter *di = roDictGetIterator(o->ptr, NULL);
+ cowUnlock();
- while((de = dictNext(di)) != NULL) {
+ while((de = roDictNext(di)) != NULL) {
robj *field = dictGetEntryKey(de);
robj *val = dictGetEntryVal(de);
@@ -614,9 +706,33 @@ int rewriteAppendOnlyFile(char *filename) {
if (fwriteBulkObject(fp,field) == 0) goto werr;
if (fwriteBulkObject(fp,val) == 0) goto werr;
}
- dictReleaseIterator(di);
+ roDictReleaseIterator(di);
+#ifdef _WIN32
+ } else if (o->encoding == REDIS_ENCODING_HTARRAY) {
+ dictEntry *de;
+ cowDictArray *ar;
+ roDictIter *di;
+ cowUnlock();
+
+ ar = (cowDictArray *)o->ptr;
+ di = roDictGetIterator(NULL, ar);
+ while((de = roDictNext(di)) != NULL) {
+ robj *field = dictGetEntryKey(de);
+ robj *val = dictGetEntryVal(de);
+
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,&key) == 0) goto werr;
+ if (fwriteBulkObject(fp,field) == 0) goto werr;
+ if (fwriteBulkObject(fp,val) == 0) goto werr;
+ }
+ roDictReleaseIterator(di);
+ } else {
+ cowUnlock();
+ redisPanic("Unknown hash dictionary encoding");
+#endif
}
} else {
+ cowUnlock();
redisPanic("Unknown object type");
}
/* Save the expire time */
@@ -629,7 +745,7 @@ int rewriteAppendOnlyFile(char *filename) {
if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr;
}
}
- dictReleaseIterator(di);
+ roDictReleaseIterator(di);
}
/* Make sure data will not remain on the OS's output buffers */
@@ -651,7 +767,7 @@ int rewriteAppendOnlyFile(char *filename) {
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
- if (di) dictReleaseIterator(di);
+ if (di) roDictReleaseIterator(di);
return REDIS_ERR;
}
@@ -667,6 +783,41 @@ int rewriteAppendOnlyFile(char *filename) {
* finally will rename(2) the temp file in the actual file name.
* The the new file is reopened as the new append only file. Profit!
*/
+#ifdef _WIN32
+int rewriteAppendOnlyFileBackground(void) {
+ pid_t childpid;
+ char tmpfile[256];
+
+ if (server.bgrewritechildpid != -1) return REDIS_ERR;
+ if (server.bgsavechildpid != -1) return REDIS_ERR;
+
+ childpid = getpid();
+ snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", childpid);
+ server.aofrewrite_scheduled = 0;
+ server.bgrewritechildpid = childpid;
+ updateDictResizePolicy();
+ server.appendseldb = -1;
+
+ if (bkgdsave_start(tmpfile, rewriteAppendOnlyFile) == -1) {
+ server.rdbbkgdfsave.background = 0;
+ redisLog(REDIS_NOTICE,
+ "Foreground append only file rewriting started by pid %d", childpid);
+
+ if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
+ backgroundRewriteDoneHandler(0);
+ return REDIS_OK;
+ } else {
+ backgroundRewriteDoneHandler(0xff);
+ redisLog(REDIS_WARNING,
+ "Can't rewrite append only file in background: spoon: %s",
+ strerror(errno));
+ return REDIS_ERR;
+ }
+ }
+ return REDIS_OK; /* unreached */
+}
+
+#else
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
@@ -688,31 +839,6 @@ int rewriteAppendOnlyFileBackground(void) {
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
-#ifdef _WIN32
- if (childpid == -1) {
- char tmpfile[256];
-
- childpid = getpid();
- snprintf(tmpfile,256,"temp-rewriteaof-bg-%lld.aof", (long long)childpid);
- server.bgrewritechildpid = childpid;
- updateDictResizePolicy();
- server.appendseldb = -1;
-
- redisLog(REDIS_NOTICE,
- "Foreground append only file rewriting started by pid %lld",(long long)childpid);
-
- if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
- backgroundRewriteDoneHandler(0);
- return REDIS_OK;
- } else {
- backgroundRewriteDoneHandler(0xff);
- redisLog(REDIS_WARNING,
- "Can't rewrite append only file in background: spoon: %s",
- strerror(errno));
- return REDIS_ERR;
- }
- }
-#else
if (childpid == -1) {
redisLog(REDIS_WARNING,
"Can't rewrite append only file in background: fork: %s",
@@ -723,7 +849,6 @@ int rewriteAppendOnlyFileBackground(void) {
"Background append only file rewriting started by pid %d",childpid);
server.aofrewrite_scheduled = 0;
server.bgrewritechildpid = childpid;
-#endif
updateDictResizePolicy();
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
@@ -734,6 +859,7 @@ int rewriteAppendOnlyFileBackground(void) {
}
return REDIS_OK; /* unreached */
}
+#endif
void bgrewriteaofCommand(redisClient *c) {
if (server.bgrewritechildpid != -1) {
View
41 src/db.c
@@ -36,8 +36,15 @@ robj *lookupKeyRead(redisDb *db, robj *key) {
}
robj *lookupKeyWrite(redisDb *db, robj *key) {
+ robj *o;
expireIfNeeded(db,key);
- return lookupKey(db,key);
+ o = lookupKey(db,key);
+#ifdef _WIN32
+ if (server.isBackgroundSaving) {
+ o = cowEnsureWriteCopy(db, key, o);
+ }
+#endif
+ return o;
}
robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
@@ -123,6 +130,12 @@ robj *dbRandomKey(redisDb *db) {
/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbDelete(redisDb *db, robj *key) {
+#ifdef _WIN32
+ /* If copy on write, may need to copy dict before delete */
+ if (server.isBackgroundSaving) {
+ cowEnsureWriteCopy(db, key, NULL);
+ }
+#endif
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
@@ -135,6 +148,12 @@ long long emptyDb() {
long long removed = 0;
for (j = 0; j < server.dbnum; j++) {
+#ifdef _WIN32
+ /* If copy on write, may need to copy dict before delete */
+ if (server.isBackgroundSaving) {
+ cowEnsureWriteCopy(&server.db[j], NULL, NULL);
+ }
+#endif
removed += dictSize(server.db[j].dict);
dictEmpty(server.db[j].dict);
dictEmpty(server.db[j].expires);
@@ -171,6 +190,12 @@ void signalFlushedDb(int dbid) {
*----------------------------------------------------------------------------*/
void flushdbCommand(redisClient *c) {
+#ifdef _WIN32
+ /* If copy on write, may need to copy dict before delete */
+ if (server.isBackgroundSaving) {
+ cowEnsureWriteCopy(c->db, NULL, NULL);
+ }
+#endif
server.dirty += dictSize(c->db->dict);
signalFlushedDb(c->db->id);
dictEmpty(c->db->dict);
@@ -183,7 +208,11 @@ void flushallCommand(redisClient *c) {
server.dirty += emptyDb();
addReply(c,shared.ok);
if (server.bgsavechildpid != -1) {
+#ifdef _WIN32
+ bkgdsave_termthread();
+#else
kill(server.bgsavechildpid,SIGKILL);
+#endif
rdbRemoveTempFile(server.bgsavechildpid);
}
if (server.saveparamslen > 0) {
@@ -416,6 +445,11 @@ void moveCommand(redisClient *c) {
int removeExpire(redisDb *db, robj *key) {
/* An expire may only be removed if there is a corresponding entry in the
* main dict. Otherwise, the key will never be freed. */
+#ifdef _WIN32
+ if (server.isBackgroundSaving) {
+ cowEnsureExpiresCopy(db);
+ }
+#endif
redisAssert(dictFind(db->dict,key->ptr) != NULL);
return dictDelete(db->expires,key->ptr) == DICT_OK;
}
@@ -423,6 +457,11 @@ int removeExpire(redisDb *db, robj *key) {
void setExpire(redisDb *db, robj *key, time_t when) {
dictEntry *de;
+#ifdef _WIN32
+ if (server.isBackgroundSaving) {
+ cowEnsureExpiresCopy(db);
+ }
+#endif
/* Reuse the sds from the main dict in the expire dict */
de = dictFind(db->dict,key->ptr);
redisAssert(de != NULL);
View
10 src/debug.c
@@ -299,6 +299,16 @@ void debugCommand(redisClient *c) {
usleep(utime);
addReply(c,shared.ok);
+#ifdef _WIN32
+ } else if (!strcasecmp(c->argv[1]->ptr,"flushload")) {
+ emptyDb();
+ if (rdbLoad(server.dbfilename) != REDIS_OK) {
+ addReplyError(c,"Error trying to load the RDB dump");
+ return;
+ }
+ redisLog(REDIS_WARNING,"DB reloaded by DEBUG flushload");
+ addReply(c,shared.ok);
+#endif
} else {
addReplyError(c,
"Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPIN <key>|SWAPOUT <key>|RELOAD]");
View
25 src/object.c
@@ -124,6 +124,11 @@ void freeListObject(robj *o) {
case REDIS_ENCODING_ZIPLIST:
zfree(o->ptr);
break;
+#ifdef _WIN32
+ case REDIS_ENCODING_LINKEDLISTARRAY:
+ cowReleaseListArray(o->ptr);
+ break;
+#endif
default:
redisPanic("Unknown list encoding type");
}
@@ -137,6 +142,11 @@ void freeSetObject(robj *o) {
case REDIS_ENCODING_INTSET:
zfree(o->ptr);
break;
+#ifdef _WIN32
+ case REDIS_ENCODING_HTARRAY:
+ cowReleaseDictArray(o->ptr);
+ break;
+#endif
default:
redisPanic("Unknown set encoding type");
}
@@ -154,6 +164,11 @@ void freeZsetObject(robj *o) {
case REDIS_ENCODING_ZIPLIST:
zfree(o->ptr);
break;
+#ifdef _WIN32
+ case REDIS_ENCODING_HTZARRAY:
+ cowReleaseDictZArray(o->ptr);
+ break;
+#endif
default:
redisPanic("Unknown sorted set encoding");
}
@@ -167,6 +182,11 @@ void freeHashObject(robj *o) {
case REDIS_ENCODING_ZIPMAP:
zfree(o->ptr);
break;
+#ifdef _WIN32
+ case REDIS_ENCODING_HTARRAY:
+ cowReleaseDictArray(o->ptr);
+ break;
+#endif
default:
redisPanic("Unknown hash encoding type");
break;
@@ -182,6 +202,11 @@ void decrRefCount(void *obj) {
if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
if (o->refcount == 1) {
+#ifdef _WIN32
+ if (server.isBackgroundSaving == 1) {
+ if (deferFreeObject(o) == 1) return;
+ }
+#endif
switch(o->type) {
case REDIS_STRING: freeStringObject(o); break;
case REDIS_LIST: freeListObject(o); break;
View
224 src/rdb.c
@@ -257,68 +257,120 @@ int rdbSaveObject(FILE *fp, robj *o) {
nwritten += n;
} else if (o->type == REDIS_LIST) {
/* Save a list value */
+ cowLock();
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
size_t l = ziplistBlobLen((unsigned char*)o->ptr);
+ cowUnlock();
if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1;
nwritten += n;
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
list *list = o->ptr;
- listIter li;
+ roListIter li;
listNode *ln;
+ int32_t len = listLength(list);
+ roListRewind(list, NULL, &li);
+ cowUnlock();
- if ((n = rdbSaveLen(fp,listLength(list))) == -1) return -1;
+ if ((n = rdbSaveLen(fp,len)) == -1) return -1;
nwritten += n;
- listRewind(list,&li);
- while((ln = listNext(&li))) {
+ while((ln = roListNext(&li))) {
robj *eleobj = listNodeValue(ln);
if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1;
nwritten += n;
}
+#ifdef _WIN32
+ } else if (o->encoding == REDIS_ENCODING_LINKEDLISTARRAY) {
+ cowListArray *ar;
+ roListIter li;
+ listNode *ln;
+
+ ar = (cowListArray *)o->ptr;
+ cowUnlock();
+ if ((n = rdbSaveLen(fp,ar->numele)) == -1) return -1;
+ nwritten += n;
+
+ roListRewind(NULL, ar, &li);
+ while((ln = roListNext(&li))) {
+ robj *eleobj = listNodeValue(ln);
+ if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1;
+ nwritten += n;
+ }
+#endif
} else {
+ cowUnlock();
redisPanic("Unknown list encoding");
}
} else if (o->type == REDIS_SET) {
/* Save a set value */
+ cowLock();
if (o->encoding == REDIS_ENCODING_HT) {
dict *set = o->ptr;
- dictIterator *di = dictGetIterator(set);
+ roDictIter *di;
dictEntry *de;
+ int32_t len = dictSize(set);
+ di = roDictGetIterator(set, NULL);
+ cowUnlock();
- if ((n = rdbSaveLen(fp,dictSize(set))) == -1) return -1;
+ if ((n = rdbSaveLen(fp,len)) == -1) return -1;
nwritten += n;
- while((de = dictNext(di)) != NULL) {
+ while((de = roDictNext(di)) != NULL) {
robj *eleobj = dictGetEntryKey(de);
if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1;
nwritten += n;
}
- dictReleaseIterator(di);
+ roDictReleaseIterator(di);
} else if (o->encoding == REDIS_ENCODING_INTSET) {
size_t l = intsetBlobLen((intset*)o->ptr);
+ cowUnlock();
if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1;
nwritten += n;
+#ifdef _WIN32
+ } else if (o->encoding == REDIS_ENCODING_HTARRAY) {
+ dictEntry *de;
+ cowDictArray *ar;
+ roDictIter *di;
+ cowUnlock();
+
+ ar = (cowDictArray *)o->ptr;
+ di = roDictGetIterator(NULL, ar);
+ if ((n = rdbSaveLen(fp,ar->numele)) == -1) return -1;
+ nwritten += n;
+
+ while((de = roDictNext(di)) != NULL) {
+ robj *eleobj = dictGetEntryKey(de);
+ if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1;
+ nwritten += n;
+ }
+ roDictReleaseIterator(di);
+#endif
} else {
+ cowUnlock();
redisPanic("Unknown set encoding");
}
} else if (o->type == REDIS_ZSET) {
/* Save a sorted set value */
+ cowLock();
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
size_t l = ziplistBlobLen((unsigned char*)o->ptr);
+ cowUnlock();
if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1;
nwritten += n;
} else if (o->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
- dictIterator *di = dictGetIterator(zs->dict);
+ roZDictIter *di = roZDictGetIterator(zs->dict, NULL);
dictEntry *de;
+ uint32_t len = dictSize(zs->dict);
+ cowUnlock();
- if ((n = rdbSaveLen(fp,dictSize(zs->dict))) == -1) return -1;
+ if ((n = rdbSaveLen(fp,len)) == -1) return -1;
nwritten += n;
- while((de = dictNext(di)) != NULL) {
+ while((de = roZDictNext(di)) != NULL) {
robj *eleobj = dictGetEntryKey(de);
double *score = dictGetEntryVal(de);
@@ -327,25 +379,76 @@ int rdbSaveObject(FILE *fp, robj *o) {
if ((n = rdbSaveDoubleValue(fp,*score)) == -1) return -1;
nwritten += n;
}
- dictReleaseIterator(di);
+ roZDictReleaseIterator(di);
+#ifdef _WIN32
+ } else if (o->encoding == REDIS_ENCODING_HTZARRAY) {
+ dictEntry *de;
+ cowDictZArray *ar;
+ roZDictIter *di;
+ cowUnlock();
+
+ ar = (cowDictZArray *)o->ptr;
+ di = roZDictGetIterator(NULL, ar);
+ if ((n = rdbSaveLen(fp,ar->numele)) == -1) return -1;
+ nwritten += n;
+
+ while((de = roZDictNext(di)) != NULL) {
+ robj *eleobj = dictGetEntryKey(de);
+ double *score = dictGetEntryVal(de);
+
+ if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1;
+ nwritten += n;
+ if ((n = rdbSaveDoubleValue(fp,*score)) == -1) return -1;
+ nwritten += n;
+ }
+ roZDictReleaseIterator(di);
+#endif
} else {
+ cowUnlock();
redisPanic("Unknown sorted set encoding");
}
} else if (o->type == REDIS_HASH) {
/* Save a hash value */
+ cowLock();
if (o->encoding == REDIS_ENCODING_ZIPMAP) {
size_t l = zipmapBlobLen((unsigned char*)o->ptr);
+ cowUnlock();
if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1;
nwritten += n;
- } else {
- dictIterator *di = dictGetIterator(o->ptr);
+ } else if (o->encoding == REDIS_ENCODING_HT) {
+ roDictIter *di;
+ dictEntry *de;
+ uint32_t len = dictSize((dict*)o->ptr);
+ di = roDictGetIterator(o->ptr, NULL);
+ cowUnlock();
+
+ if ((n = rdbSaveLen(fp,len)) == -1) return -1;
+ nwritten += n;
+
+ while((de = roDictNext(di)) != NULL) {
+ robj *key = dictGetEntryKey(de);
+ robj *val = dictGetEntryVal(de);
+
+ if ((n = rdbSaveStringObject(fp,key)) == -1) return -1;
+ nwritten += n;
+ if ((n = rdbSaveStringObject(fp,val)) == -1) return -1;
+ nwritten += n;
+ }
+ roDictReleaseIterator(di);
+#ifdef _WIN32
+ } else if (o->encoding == REDIS_ENCODING_HTARRAY) {
dictEntry *de;
+ cowDictArray *ar;
+ roDictIter *di;
+ cowUnlock();
- if ((n = rdbSaveLen(fp,dictSize((dict*)o->ptr))) == -1) return -1;
+ ar = (cowDictArray *)o->ptr;
+ di = roDictGetIterator(NULL, ar);
+ if ((n = rdbSaveLen(fp,ar->numele)) == -1) return -1;
nwritten += n;
- while((de = dictNext(di)) != NULL) {
+ while((de = roDictNext(di)) != NULL) {
robj *key = dictGetEntryKey(de);
robj *val = dictGetEntryVal(de);
@@ -354,7 +457,11 @@ int rdbSaveObject(FILE *fp, robj *o) {
if ((n = rdbSaveStringObject(fp,val)) == -1) return -1;
nwritten += n;
}
- dictReleaseIterator(di);
+ roDictReleaseIterator(di);
+ } else {
+ cowUnlock();
+ redisPanic("Unknown hash dictionary encoding");
+#endif
}
} else {
redisPanic("Unknown object type");
@@ -388,7 +495,7 @@ int getObjectSaveType(robj *o) {
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
int rdbSave(char *filename) {
- dictIterator *di = NULL;
+ roDictIter *di = NULL;
dictEntry *de;
FILE *fp;
char tmpfile[256];
@@ -408,9 +515,20 @@ int rdbSave(char *filename) {
if (fwrite("REDIS0002",9,1,fp) == 0) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
- dict *d = db->dict;
+ dict *d;
+#ifdef _WIN32
+ if (server.isBackgroundSaving == 1) {
+ /* use background DB copy */
+ db = server.cowSaveDb+j;
+ }
+ d = db->dict;
+ if (roDBDictSize(j) == 0) continue;
+ di = roDBGetIterator(j);
+#else
+ d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
+#endif
if (!di) {
fclose(fp);
return REDIS_ERR;
@@ -421,7 +539,7 @@ int rdbSave(char *filename) {
if (rdbSaveLen(fp,j) == -1) goto werr;
/* Iterate this DB writing every entry */
- while((de = dictNext(di)) != NULL) {
+ while((de = roDictNext(di)) != NULL) {
sds keystr = dictGetEntryKey(de);
robj key, *o = dictGetEntryVal(de);
time_t expiretime;
@@ -442,9 +560,18 @@ int rdbSave(char *filename) {
/* Save type, key, value */
if (rdbSaveType(fp,otype) == -1) goto werr;
if (rdbSaveStringObject(fp,&key) == -1) goto werr;
+#ifdef _WIN32
+ /* check if using read-only encoding for saving */
+ if (otype == REDIS_LIST ||
+ otype == REDIS_SET ||
+ otype == REDIS_ZSET ||
+ otype == REDIS_HASH) {
+ o = (robj *)getRoConvertedObj(keystr, o);
+ }
+#endif
if (rdbSaveObject(fp,o) == -1) goto werr;
}
- dictReleaseIterator(di);
+ roDictReleaseIterator(di);
}
/* EOF opcode */
if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
@@ -470,10 +597,32 @@ int rdbSave(char *filename) {
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
- if (di) dictReleaseIterator(di);
+ if (di) roDictReleaseIterator(di);
return REDIS_ERR;
}
+#ifdef _WIN32
+int rdbSaveBackground(char *filename) {
+ if (server.bgsavechildpid != -1) return REDIS_ERR;
+ if (server.bgrewritechildpid != -1) return REDIS_ERR;
+ server.dirty_before_bgsave = server.dirty;
+
+ server.bgsavechildpid = getpid();
+ if (bkgdsave_start(filename, rdbSave) == -1) {
+ /* couldn't do in background. Do it in foreground */
+ redisLog(REDIS_WARNING,"Background save failed. Trying foreground");
+ server.rdbbkgdfsave.background = 0;
+ if (rdbSave(filename) == REDIS_OK) {
+ backgroundSaveDoneHandler(0);
+ return REDIS_OK;
+ } else {
+ backgroundSaveDoneHandler(0x100);
+ return REDIS_ERR;
+ }
+ }
+ return REDIS_OK;
+}
+#else
int rdbSaveBackground(char *filename) {
pid_t childpid;
long long start;
@@ -483,14 +632,7 @@ int rdbSaveBackground(char *filename) {
start = ustime();
if ((childpid = fork()) == 0) {
/* Child */
-#ifdef _WIN32
- if (server.ipfd > 0) {
- aeWinSocketDetach(server.ipfd, 0);
- closesocket(server.ipfd);
- }
-#else
if (server.ipfd > 0) close(server.ipfd);
-#endif
if (server.sofd > 0) close(server.sofd);
if (rdbSave(filename) == REDIS_OK) {
_exit(0);
@@ -501,27 +643,9 @@ int rdbSaveBackground(char *filename) {
/* Parent */
server.stat_fork_time = ustime()-start;
if (childpid == -1) {
-#ifdef _WIN32
- /* On WIN32 fork() is empty function which always return -1 */
- /* So, on WIN32, let's just save in foreground. */
- redisLog(REDIS_NOTICE,"Foregroud saving started by pid %d", getpid());
- server.bgsavechildpid = getpid();
- updateDictResizePolicy();
-
- if (rdbSave(filename) == REDIS_OK) {
- backgroundSaveDoneHandler(0);
- return REDIS_OK;
- } else {
- redisLog(REDIS_WARNING,"Can't save in background: spoon err: %s",
- strerror(errno));
- backgroundSaveDoneHandler(0xff);
- return REDIS_ERR;
- }
-#else
redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return REDIS_ERR;
-#endif
}
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
server.bgsavechildpid = childpid;
@@ -530,6 +654,7 @@ int rdbSaveBackground(char *filename) {
}
return REDIS_OK; /* unreached */
}
+#endif
void rdbRemoveTempFile(pid_t childpid) {
char tmpfile[256];
@@ -1041,6 +1166,11 @@ void backgroundSaveDoneHandler(int statloc) {
"Background saving terminated by signal %d", WTERMSIG(statloc));
rdbRemoveTempFile(server.bgsavechildpid);
}
+#ifdef _WIN32
+ server.rdbbkgdfsave.state = BKSAVE_IDLE;
+ /* turn off copy on write */
+ cowBkgdSaveStop();
+#endif
server.bgsavechildpid = -1;
/* Possibly there are slaves waiting for a BGSAVE in order to be served
* (the first stage of SYNC is a bulk transfer of dump.rdb) */
View
33 src/redis.c
@@ -632,6 +632,21 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* Check if a background saving or AOF rewrite in progress terminated */
if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
+#ifdef _WIN32
+ if (server.rdbbkgdfsave.state == BKSAVE_SUCCESS) {
+ if (server.bgsavechildpid != -1) {
+ backgroundSaveDoneHandler(0);
+ } else {
+ backgroundRewriteDoneHandler(0);
+ }
+ } else if (server.rdbbkgdfsave.state == BKSAVE_FAILED) {
+ if (server.bgsavechildpid != -1) {
+ backgroundSaveDoneHandler(0x100);
+ } else {
+ backgroundRewriteDoneHandler(0x100);
+ }
+ }
+#else
int statloc;
pid_t pid;
@@ -643,6 +658,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
updateDictResizePolicy();
}
+#endif
} else {
time_t now = time(NULL);
@@ -656,13 +672,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, sp->seconds);
rdbSaveBackground(server.dbfilename);
-#ifdef _WIN32
- /* On windows this will save in foreground and block */
- /* Here we are allready saved, and we should return */
- return 100;
-#else
break;
-#endif
}
}
@@ -1158,7 +1168,11 @@ int prepareForShutdown() {
overwrite the synchronous saving did by SHUTDOWN. */
if (server.bgsavechildpid != -1) {
redisLog(REDIS_WARNING,"There is a child saving an .rdb. Killing it!");
+#ifdef _WIN32
+ bkgdsave_termthread();
+#else
kill(server.bgsavechildpid,SIGKILL);
+#endif
rdbRemoveTempFile(server.bgsavechildpid);
}
if (server.appendonly) {
@@ -1167,7 +1181,11 @@ int prepareForShutdown() {
if (server.bgrewritechildpid != -1) {
redisLog(REDIS_WARNING,
"There is a child rewriting the AOF. Killing it!");
+#ifdef _WIN32
+ bkgdsave_termthread();
+#else
kill(server.bgrewritechildpid,SIGKILL);
+#endif
}
/* Append only file: fsync() the AOF and exit */
redisLog(REDIS_NOTICE,"Calling fsync() on the AOF file.");
@@ -1796,6 +1814,9 @@ int main(int argc, char **argv) {
}
if (server.daemonize) daemonize();
initServer();
+#ifdef _WIN32
+ cowInit();
+#endif
if (server.daemonize) createPidFile();
redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
#ifdef __linux__
View
21 src/redis.h
@@ -20,6 +20,7 @@
#include <errno.h>
#ifdef _WIN32
#include "win32fixes.h"
+#include "win32_bksv.h"
#else
#include <pthread.h>
#include <syslog.h>
@@ -37,6 +38,8 @@
#include "version.h"
#include "util.h"
+#include "win32_cow.h"
+
/* Error codes */
#define REDIS_OK 0
#define REDIS_ERR -1
@@ -103,6 +106,11 @@
#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define REDIS_ENCODING_INTSET 6 /* Encoded as intset */
#define REDIS_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
+#ifdef _WIN32
+#define REDIS_ENCODING_HTARRAY 12 /* read-only dict array for bgsave */
+#define REDIS_ENCODING_LINKEDLISTARRAY 13 /* read-only list array for bgsave */
+#define REDIS_ENCODING_HTZARRAY 14 /* read-only zset dict array for bgsave */
+#endif
/* Object types only used for dumping to disk */
#define REDIS_EXPIRETIME 253
@@ -412,6 +420,15 @@ struct redisServer {
char *pidfile;
pid_t bgsavechildpid;
pid_t bgrewritechildpid;
+#ifdef _WIN32
+ bkgdfsave rdbbkgdfsave;
+ dict *cowDictCopied;
+ dict *cowDictConverted;
+ int isBackgroundSaving;
+ bkgdDbExt *cowSaveDbExt;
+ redisDb *cowSaveDb;
+ bkgditers cowCurIters;
+#endif
sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
sds aofbuf; /* AOF buffer, written before entering the event loop */
struct saveparam *saveparams;
@@ -731,6 +748,10 @@ off_t rdbSavedObjectLen(robj *o);
robj *rdbLoadObject(int type, FILE *fp);
void backgroundSaveDoneHandler(int statloc);
int getObjectSaveType(robj *o);
+#ifdef _WIN32
+robj *cowEnsureWriteCopy(redisDb *db, robj *key, robj *val);
+void cowEnsureExpiresCopy(redisDb *db);
+#endif
/* AOF persistence */
void flushAppendOnlyFile(int force);
View
49 src/replication.c
@@ -77,36 +77,6 @@ void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc)
decrRefCount(cmdobj);
}
-#ifdef _WIN32
-/* Win32 may fail in renaming file while it is being read for sending */
-/* if some client is doing bulk transfer, try again when it completes */
-int DelayBkgdSaveForReplication() {
- listIter li;
- int sending = 0;
- listNode *ln;
-
- listRewind(server.slaves,&li);
- while((ln = listNext(&li))) {
- redisClient *slave = ln->value;
- if (slave->repldbfd != -1) {
- sending++;
- }
- }
- if (sending > 0) {
- listRewind(server.slaves,&li);
- while((ln = listNext(&li))) {
- redisClient *slave = ln->value;
- if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
- slave->replstate = REDIS_REPL_WAIT_BGSAVE_START;
- }
- }
- redisLog(REDIS_VERBOSE,"Delay starting bgsave for replication");
- return 1;
- }
- return 0;
-}
-#endif
-
void syncCommand(redisClient *c) {
/* ignore SYNC if aleady slave or in monitor mode */
if (c->flags & REDIS_SLAVE) return;
@@ -156,11 +126,6 @@ void syncCommand(redisClient *c) {
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
}
} else {
-#ifdef _WIN32
- if (DelayBkgdSaveForReplication() > 0) {
- c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
- } else {
-#endif
/* Ok we don't have a BGSAVE in progress, let's start one */
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
@@ -169,18 +134,11 @@ void syncCommand(redisClient *c) {
return;
}
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
-#ifdef _WIN32
- }
-#endif
}
c->repldbfd = -1;
c->flags |= REDIS_SLAVE;
c->slaveseldb = 0;
listAddNodeTail(server.slaves,c);
-#ifdef _WIN32
- /* Since WIN32 won't fork(), but instead do Save() we must manualy call this */
- updateSlavesWaitingBgsave(REDIS_OK);
-#endif
return;
}
@@ -213,8 +171,6 @@ void sendBulkToSlaveDataDone(aeEventLoop *el, int fd, void *privdata, int nwritt
}
addReplySds(slave,sdsempty());
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
- /* we have have delayed other clients. */
- updateSlavesWaitingBgsave(REDIS_OK);
}
}
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
@@ -366,11 +322,6 @@ void updateSlavesWaitingBgsave(int bgsaveerr) {
}
}
if (startbgsave) {
-#ifdef _WIN32
- if (DelayBkgdSaveForReplication() > 0) {
- return;
- }
-#endif
if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
listIter li;
View
6 src/t_list.c
@@ -306,6 +306,12 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,subject,REDIS_LIST)) return;
+#ifdef _WIN32
+ /* need this because does not call lookupKeyWriteOrReply() */
+ if (subject && server.isBackgroundSaving) {
+ subject = cowEnsureWriteCopy(c->db, c->argv[1], subject);
+ }
+#endif
if (refval != NULL) {
/* Note: we expect refval to be string-encoded because it is *not* the
View
15 src/t_string.c
@@ -120,8 +120,13 @@ void setbitCommand(redisClient *c) {
} else {
if (checkType(c,o,REDIS_STRING)) return;
+#ifdef _WIN32
+ /* Create a copy when the object is shared or encoded or COW is on. */
+ if (o->refcount != 1 || o->encoding != REDIS_ENCODING_RAW || server.isBackgroundSaving == 1) {
+#else
/* Create a copy when the object is shared or encoded. */
if (o->refcount != 1 || o->encoding != REDIS_ENCODING_RAW) {
+#endif
robj *decoded = getDecodedObject(o);
o = createStringObject(decoded->ptr, sdslen(decoded->ptr));
decrRefCount(decoded);
@@ -218,8 +223,13 @@ void setrangeCommand(redisClient *c) {
if (checkStringLength(c,offset+sdslen(value)) != REDIS_OK)
return;
+#ifdef _WIN32
+ /* Create a copy when the object is shared or encoded or COW is on. */
+ if (o->refcount != 1 || o->encoding != REDIS_ENCODING_RAW || server.isBackgroundSaving == 1) {
+#else
/* Create a copy when the object is shared or encoded. */
if (o->refcount != 1 || o->encoding != REDIS_ENCODING_RAW) {
+#endif
robj *decoded = getDecodedObject(o);
o = createStringObject(decoded->ptr, sdslen(decoded->ptr));
decrRefCount(decoded);
@@ -399,8 +409,13 @@ void appendCommand(redisClient *c) {
if (checkStringLength(c,totlen) != REDIS_OK)
return;
+#ifdef _WIN32
+ /* If the object is shared or encoded or COW is on, we have to make a copy */
+ if (o->refcount != 1 || o->encoding != REDIS_ENCODING_RAW || server.isBackgroundSaving == 1) {
+#else
/* If the object is shared or encoded, we have to make a copy */
if (o->refcount != 1 || o->encoding != REDIS_ENCODING_RAW) {
+#endif
robj *decoded = getDecodedObject(o);
o = createStringObject(decoded->ptr, sdslen(decoded->ptr));
decrRefCount(decoded);
View
130 src/win32_bksv.c
@@ -0,0 +1,130 @@
+
+#include "redis.h"
+#include "win32_wsiocp.h"
+
+#ifdef _WIN32
+
+DWORD WINAPI BkgdSaveThreadProc(LPVOID param);
+void bkgdsave_cleanup();
+
+
+/* start a background save using a windows thread.
+ * used for rdb save and aof save */
+int bkgdsave_start(const char *filename, int (*bkgdfsave_serialize)(char *)) {
+ if (server.rdbbkgdfsave.state != BKSAVE_IDLE) {
+ /* only one background activity at a time is allowed */
+ errno = EINVAL;
+ return -1;
+ }
+ server.rdbbkgdfsave.state = BKSAVE_WRITING;
+ cowBkgdSaveStart();
+
+ if (server.rdbbkgdfsave.thread == NULL) {
+ server.rdbbkgdfsave.dosaveevent = CreateEvent(NULL, FALSE, FALSE, NULL);
+ if (server.rdbbkgdfsave.dosaveevent == NULL) {
+ goto failed;
+ }
+
+ server.rdbbkgdfsave.terminateevent = CreateEvent(NULL, FALSE, FALSE, NULL);
+ if (server.rdbbkgdfsave.terminateevent == NULL) {
+ goto failed;
+ }
+
+ server.rdbbkgdfsave.thread = CreateThread(NULL, 0, BkgdSaveThreadProc, &server.rdbbkgdfsave, 0, NULL);
+ if (server.rdbbkgdfsave.thread == NULL) {
+ goto failed;
+ }
+ }
+
+ server.rdbbkgdfsave.filename = (char*)zmalloc(strlen(filename) + 1);
+ strcpy(server.rdbbkgdfsave.filename, filename);
+ server.rdbbkgdfsave.bkgdfsave_serialize = bkgdfsave_serialize;
+
+ /* signal background thread to run */
+ SetEvent(server.rdbbkgdfsave.dosaveevent);
+ return REDIS_OK;
+
+failed:
+ bkgdsave_cleanup();
+ errno = EINVAL;
+ return -1;
+}
+
+/* terminate the background save thread */
+int bkgdsave_termthread() {
+ if (server.rdbbkgdfsave.terminateevent != NULL && server.rdbbkgdfsave.thread != NULL) {
+ SetEvent(server.rdbbkgdfsave.terminateevent);
+ WaitForSingleObject(server.rdbbkgdfsave.thread, INFINITE);
+ }
+ bkgdsave_cleanup();
+ return 0;
+}
+
+
+/* cleanup state for thread termination */
+void bkgdsave_cleanup() {
+ if (server.rdbbkgdfsave.dosaveevent != NULL) {
+ CloseHandle(server.rdbbkgdfsave.dosaveevent);
+ server.rdbbkgdfsave.dosaveevent = NULL;
+ }
+ if (server.rdbbkgdfsave.terminateevent != NULL) {
+ CloseHandle(server.rdbbkgdfsave.terminateevent);
+ server.rdbbkgdfsave.terminateevent = NULL;
+ }
+ if (server.rdbbkgdfsave.thread != NULL) {
+ CloseHandle(server.rdbbkgdfsave.thread);
+ server.rdbbkgdfsave.thread = NULL;
+ }
+ if (server.rdbbkgdfsave.filename != NULL) {
+ zfree(server.rdbbkgdfsave.filename);
+ server.rdbbkgdfsave.filename = NULL;
+ }
+ if (server.rdbbkgdfsave.tmpname != NULL) {
+ zfree(server.rdbbkgdfsave.tmpname);
+ server.rdbbkgdfsave.tmpname = NULL;
+ }
+}
+
+/* initialize the background save state */
+void bkgdsave_init() {
+ server.rdbbkgdfsave.dosaveevent = NULL;
+ server.rdbbkgdfsave.terminateevent = NULL;
+ server.rdbbkgdfsave.thread = NULL;
+ server.rdbbkgdfsave.state = BKSAVE_IDLE;
+ server.rdbbkgdfsave.filename = NULL;
+ server.rdbbkgdfsave.tmpname = NULL;
+}
+
+
+/* background thread to write buffers to disk */
+DWORD WINAPI BkgdSaveThreadProc(LPVOID param) {
+ HANDLE workorterm[2];
+ int rc = REDIS_OK;
+
+ workorterm[0] = server.rdbbkgdfsave.terminateevent;
+ workorterm[1] = server.rdbbkgdfsave.dosaveevent;
+
+ while (1) {
+
+ DWORD ev = WaitForMultipleObjects(2, workorterm, FALSE, INFINITE);
+ if (ev != (WAIT_OBJECT_0 + 1)) {
+ /* terminate or unexpected return, do exit */
+ bkgdsave_cleanup();
+ return 0;
+ }
+
+ /* start saving data into buffers */
+ server.rdbbkgdfsave.background = 1;
+ rc = server.rdbbkgdfsave.bkgdfsave_serialize(server.rdbbkgdfsave.filename);
+ server.rdbbkgdfsave.background = 0;
+
+ if (rc == REDIS_OK)
+ server.rdbbkgdfsave.state = BKSAVE_SUCCESS;
+ else
+ server.rdbbkgdfsave.state = BKSAVE_FAILED;
+ }
+
+ return 0;
+}
+
+#endif
View
38 src/win32_bksv.h
@@ -0,0 +1,38 @@
+#ifndef __W32BKSAV_H__
+#define __W32BKSAV_H__
+
+#ifdef _WIN32
+
+#include "adlist.h"
+
+#define BKSAVE_IDLE 0
+#define BKSAVE_BUFFERING 1
+#define BKSAVE_WRITING 3
+#define BKSAVE_SUCCESS 4
+#define BKSAVE_FAILED 5
+
+
+/* each buffer has a current postion and remaining space */
+typedef struct bkgdfsavehdr {
+ size_t pos;
+ size_t rem;
+} bkgdfsavehdr;
+
+typedef struct bkgdfsave {
+ int background;
+ int state;
+ HANDLE dosaveevent;
+ HANDLE terminateevent;
+ HANDLE thread;
+ char *filename;
+ char *tmpname;
+ int (*bkgdfsave_serialize)(char *);
+} bkgdfsave;
+
+void bkgdsave_init();
+int bkgdsave_start(const char *filename, int (*bkgdfsave_serialize)(char *));
+int bkgdsave_termthread();
+
+#endif
+
+#endif
View
922 src/win32_cow.c
@@ -0,0 +1,922 @@
+/************************************************************************
+ * This module implements copy on write to support
+ * saving on a background thread in Windows.
+ *
+ * Collection objects (dictionaries, lists, sets, zsets)
+ * are copied to a read-only form if a command to modify the
+ * collection is started. This is triggered via lookupKeyWrite().
+ *
+ * Objects which are modified in place - ziplist, zipset, etc.
+ * are copied before being modified.
+ * Strings are normally copied before being modified.
+ *
+ * In addition deletion of objects is deferred until the save is completed.
+ * This is done by modifying the dictionary delete function, and also
+ * by modifying the decrRefCount function.
+ *
+ * To allow conversion of collections while the save is iterating on them
+ * special iterators are used. These iterators can be migrated
+ * from their normal mode to iterating over a read-only collection.
+ * Locking is used so that iterator can be used from 2 threads.
+ * For migration to work properly, only one save at a time may run.
+ * (this restriction was already imposed in the Redis code)
+ *
+ ************************************************************************/
+#include "redis.h"
+
+#ifdef _WIN32
+
+/* list of objects to be deleted */
+list *deferSdsDelete = NULL;
+list *deferObjDelete = NULL;
+
+/* COW locking */
+void cowLock() {
+ EnterCriticalSection(&server.cowCurIters.csMigrate);
+}
+
+void cowUnlock() {
+ LeaveCriticalSection(&server.cowCurIters.csMigrate);
+}
+
+/* read only iterator migration
+ * switches from actual collection to readonly array */
+void roDBMigrateIterator(dict *d, cowDictArray *ar) {
+ cowLock();
+ if (server.cowCurIters.curDbDictIter != NULL &&
+ server.cowCurIters.curDbDictIter->hdict == d) {
+ server.cowCurIters.curDbDictIter->ar = ar;
+ }
+ cowUnlock();
+}
+
+void roDictMigrateIterator(dict *d, cowDictArray *ar) {
+ cowLock();
+ if (server.cowCurIters.curObjDictIter != NULL &&
+ server.cowCurIters.curObjDictIter->hdict == d) {
+ server.cowCurIters.curObjDictIter->ar = ar;
+ }
+ cowUnlock();
+}
+
+void roZDictMigrateIterator(dict *d, cowDictZArray *ar) {
+ cowLock();
+ if (server.cowCurIters.curObjZDictIter != NULL &&
+ server.cowCurIters.curObjZDictIter->hdict == d) {
+ server.cowCurIters.curObjZDictIter->ar = ar;
+ }
+ cowUnlock();
+}
+
+void roListMigrateIterator(list *l, cowListArray *ar) {
+ cowLock();
+ if (server.cowCurIters.curObjListIter != NULL &&
+ server.cowCurIters.curObjListIter->olist == l) {
+ server.cowCurIters.curObjListIter->ar = ar;
+ }
+ cowUnlock();
+}
+
+
+/* put object in list if deletes are deferred.
+ * return 1 if deferred, 0 otherwise */
+int deferFreeObject(void *obj) {
+ if (deferObjDelete != NULL && server.isBackgroundSaving == 1) {
+ listAddNodeHead(deferObjDelete, obj);
+ return 1;
+ }
+ return 0;
+}
+
+/* dictionary destructors for defering delete */
+void dictDbKeyDeferDestructor(void *privdata, void *val) {
+ DICT_NOTUSED(privdata);
+
+ listAddNodeHead(deferSdsDelete, val);
+}
+
+static void dictDbValDeferDestructor(void *privdata, void *val) {
+ DICT_NOTUSED(privdata);
+
+ listAddNodeHead(deferObjDelete, val);
+}
+
+
+/* dictionary types */
+static void dictSdsDestructor(void *privdata, void *val)
+{
+ DICT_NOTUSED(privdata);
+
+ sdsfree((sds)val);
+}
+
+static unsigned int dictSdsHash(const void *key) {
+ return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
+}
+
+static int dictSdsKeyCompare(void *privdata, const void *key1,
+ const void *key2)
+{
+ int l1,l2;
+ DICT_NOTUSED(privdata);
+
+ l1 = sdslen((sds)key1);
+ l2 = sdslen((sds)key2);
+ if (l1 != l2) return 0;
+ return memcmp(key1, key2, l1) == 0;
+}
+static void dictRedisObjectDestructor(void *privdata, void *val)
+{
+ DICT_NOTUSED(privdata);
+
+ decrRefCount(val);
+}
+
+/* dictionary types used to support copy on write */
+dictType ptrDictType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ dictSdsDestructor, /* key destructor */
+ NULL /* val destructor */
+};
+
+dictType dbDeferDictType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ dictDbKeyDeferDestructor, /* key destructor */
+ dictRedisObjectDestructor /* val destructor */
+};
+
+dictType dbReadOnlyDictType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ NULL, /* key destructor */
+ NULL /* val destructor */
+};
+
+dictType copiedCollectionDictType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ NULL, /* key destructor */
+ dictRedisObjectDestructor /* val destructor */
+};
+
+
+/* convert a linked list encoding to a list array encoding */
+cowListArray *cowConvertListToArray(list *olist) {
+ listIter li;
+ listNode *ln;
+ cowListArray *lar;
+ listNode *lnNew;
+ listNode *lnPrev;
+ unsigned int ix = 0;
+
+ lar = (cowListArray *)zmalloc(sizeof(cowListArray) + (sizeof(listNode) * olist->len));
+
+ /* add copy of each item from old list */
+ listRewind(olist,&li);
+ lnNew = &lar->le[0];
+ lnPrev = NULL;
+ while((ln = listNext(&li)) && ix < olist->len) {
+ /* copy object value to array list
+ Do not incr ref count. */
+ lnNew->value = listNodeValue(ln);
+ lnNew->prev = lnPrev;
+ if (lnPrev != NULL) {
+ lnPrev->next = lnNew;
+ }
+ lnPrev = lnNew;
+ lnNew++;
+ ix++;
+ }
+ if (lnPrev != NULL) {
+ lnPrev->next = NULL;
+ }
+ lar->numele = ix;
+ return lar;
+}
+
+void cowReleaseListArray(cowListArray *ar) {
+ zfree(ar);
+}
+
+/* convert a hash dictionary encoding to a dictionary array encoding */
+cowDictArray *cowConvertDictToArray(dict *hdict) {
+ dictIterator * di;
+ dictEntry *de;
+ int dsize;
+ cowDictArray *dar;
+ int dcount = 0;
+ dictEntry *deNew;
+ dictEntry *dePrev;
+
+ /* create copy */
+ dsize = dictSize(hdict) > dictSlots(hdict) ? dictSize(hdict) : dictSlots(hdict);
+ dar = (cowDictArray *)zmalloc(sizeof(cowDictArray) + (dsize * sizeof(dictEntry)));
+
+ /* copy all entries without refcounting or copying values */
+ /* can't just memcpy the whole dictionary because entries are allocated */
+ di = dictGetSafeIterator(hdict);
+ deNew = &dar->de[0];
+ dePrev = NULL;
+ while((de = dictNext(di)) != NULL && dcount < dsize) {
+ /* copy object value to dict array
+ Do not incr ref count. */
+ deNew->val = de->val;
+ deNew->key = de->key;
+ /* fix next ptr of prev entry */
+ if (dePrev != NULL) {
+ dePrev->next = deNew;
+ }
+ dePrev = deNew;
+ deNew++;
+ dcount++;
+ }
+ if (dePrev != NULL) {
+ dePrev->next = NULL;
+ }
+ dar->numele = dcount;
+ dictReleaseIterator(di);
+ return dar;
+}
+
+void cowReleaseDictArray(cowDictArray *ar) {
+ zfree(ar);
+}
+
+/* convert a hash dictionary encoding to a dictionary array encoding */
+cowDictZArray *cowConvertDictToZArray(dict *hdict) {
+ dictIterator * di;
+ dictEntry *de;
+ int dsize;
+ cowDictZArray *dar;
+ int dcount = 0;
+ dictZEntry *dezNew;
+ dictZEntry *dezPrev;
+
+ /* create copy */
+ dsize = dictSize(hdict) > dictSlots(hdict) ? dictSize(hdict) : dictSlots(hdict);
+ dar = (cowDictZArray *)zmalloc(sizeof(cowDictZArray) +
+ (dsize * sizeof(dictZEntry)) );
+
+ /* copy all entries without refcounting or copying values */
+ /* can't just memcpy the whole dictionary because entries are allocated */
+ di = dictGetSafeIterator(hdict);
+ dezNew = &dar->zde[0];
+ dezPrev = NULL;
+ while((de = dictNext(di)) != NULL && dcount < dsize) {
+ double *score = (double *)dictGetEntryVal(de);
+ /* copy score value into array
+ and point val to score. */
+ dezNew->de.key = de->key;
+ dezNew->score = *score;
+ dezNew->de.val = &dezNew->score;
+ /* fix next ptr of prev entry */
+ if (dezPrev != NULL) {
+ dezPrev->de.next = &dezNew->de;
+ }
+ dezPrev = dezNew;
+ dezNew++;
+ dcount++;
+ }
+ if (dezPrev != NULL) {
+ dezPrev->de.next = NULL;
+ }
+ dar->numele = dcount;
+ dictReleaseIterator(di);
+ return dar;
+}
+
+void cowReleaseDictZArray(cowDictZArray *ar) {
+ zfree(ar);
+}
+
+/* convert a linked list encoding to a list array encoding */
+robj *cowListCopy(robj *val) {
+ long long sttime;
+ robj *newval;
+ sttime = ustime();
+ if (val->encoding == REDIS_ENCODING_ZIPLIST) {
+ size_t bytes;
+ redisLog(REDIS_NOTICE, "cowListCopy REDIS_ENCODING_ZIPLIST");
+ newval = createZiplistObject();
+ /* do raw memory copy */
+ bytes = ziplistBlobLen(val->ptr);
+ newval->ptr = zrealloc(newval->ptr, bytes);
+ memcpy(newval->ptr, val->ptr, bytes);
+
+ return newval;
+ } else if (val->encoding == REDIS_ENCODING_LINKEDLIST) {
+ list *list = val->ptr;
+ cowListArray *lar;
+
+ redisLog(REDIS_NOTICE, "cowListCopy REDIS_ENCODING_LINKEDLIST");
+ lar = cowConvertListToArray(list);
+ newval = createObject(REDIS_LIST, lar);
+ newval->encoding = REDIS_ENCODING_LINKEDLISTARRAY;
+
+ return newval;
+ } else {
+ /* error. unexpected encoding */
+ return NULL;
+ }
+}
+
+/* convert a hash dictionary encoding to a dictionary array encoding */
+robj *cowSetCopy(robj *val) {
+ robj *newval;
+ if (val->encoding == REDIS_ENCODING_INTSET) {
+ size_t bytes;
+ redisLog(REDIS_NOTICE, "cowSetCopy REDIS_ENCODING_INTSET");
+ newval = createIntsetObject();
+ /* do raw memory copy */
+ bytes = intsetBlobLen(val->ptr);
+ newval->ptr = zrealloc(newval->ptr, bytes);
+ memcpy(newval->ptr, val->ptr, bytes);
+ return newval;
+ } else if (val->encoding == REDIS_ENCODING_HT) {
+ dict *olddict = (dict *)val->ptr;
+ cowDictArray *dar;
+
+ redisLog(REDIS_NOTICE, "cowSetCopy REDIS_ENCODING_HT");
+ dar = cowConvertDictToArray(olddict);
+ newval = createObject(REDIS_SET, dar);
+ newval->encoding = REDIS_ENCODING_HTARRAY;
+
+ return newval;
+ } else {
+ /* error. unexpected encoding */
+ return NULL;
+ }
+ return NULL;
+}
+
+/* convert a hash dictionary encoding to a dictionary array encoding */
+robj *cowZSetCopy(robj *val) {
+ robj *newval;
+ if (val->encoding == REDIS_ENCODING_ZIPLIST) {
+ size_t bytes;
+ redisLog(REDIS_NOTICE, "cowZSetCopy REDIS_ENCODING_ZIPLIST");
+ newval = createZsetZiplistObject();
+ /* do raw memory copy */
+ bytes = ziplistBlobLen(val->ptr);
+ newval->ptr = zrealloc(newval->ptr, bytes);
+ memcpy(newval->ptr, val->ptr, bytes);
+ return newval;
+ } else if (val->encoding == REDIS_ENCODING_SKIPLIST) {
+ zset *oldzs = (zset *)val->ptr;
+ cowDictZArray *dar;
+
+ redisLog(REDIS_NOTICE, "cowZSetCopy REDIS_ENCODING_SKIPLIST");
+ dar = cowConvertDictToZArray(oldzs->dict);
+ newval = createObject(REDIS_ZSET, dar);
+ newval->encoding = REDIS_ENCODING_HTZARRAY;
+
+ return newval;
+ } else {
+ /* error. unexpected encoding */
+ return NULL;
+ }
+ return NULL;
+}
+
+/* convert a hash dictionary encoding to a dictionary array encoding */
+robj *cowHashCopy(robj *val) {
+ robj *newval = createHashObject();
+ if (val->encoding == REDIS_ENCODING_ZIPMAP) {
+ size_t bytes;
+ redisLog(REDIS_NOTICE, "cowHashCopy REDIS_ENCODING_ZIPMAP");
+ /* do raw memory copy */
+ bytes = zipmapBlobLen(val->ptr);
+ newval->ptr = zrealloc(newval->ptr, bytes);
+ memcpy(newval->ptr, val->ptr, bytes);
+ return newval;
+ } else if (val->encoding == REDIS_ENCODING_HT) {
+ dict *olddict = (dict *)val->ptr;
+ cowDictArray *dar;
+
+ redisLog(REDIS_NOTICE, "cowHashCopy REDIS_ENCODING_HT");
+ dar = cowConvertDictToArray(olddict);
+ newval = createObject(REDIS_HASH, dar);
+ newval->encoding = REDIS_ENCODING_HTARRAY;
+
+ return newval;
+ } else {
+ /* error. unexpected encoding */
+ return NULL;
+ }
+ return NULL;
+}
+
+/* Make a readonly version of a dictionary of redis objects
+ and make the existing dictionary not delete objects */
+cowDictArray *copyReadonly_dictobj(dict *curdict, bkgdDbExt *extDict) {
+ cowDictArray *dar;
+
+ /* checks if copy needed. else return curdict */
+ if (server.isBackgroundSaving == 0 || server.cowDictCopied == NULL) {
+ return NULL;
+ }
+
+ /* create copy */
+ dar = cowConvertDictToArray(curdict);
+
+ if (extDict != NULL) {
+ /* fix types to not delete while saving */
+ extDict->savedType = curdict->type;
+ curdict->type = extDict->cowType;
+ }
+
+ return dar;
+}
+
+/* if copy on write active, then ensure there is a
+ copy of the value that is safe to modify or delete,
+ and update DB dict entry to refer to this value*/
+robj *cowEnsureWriteCopy(redisDb *db, robj *key, robj *val) {
+ long long sttime;
+
+ if (server.isBackgroundSaving == 0 ||
+ server.cowDictCopied == NULL) {
+ /* no copy needed */
+ return val;
+ } else {
+ int added = 0;
+ sds keyname;
+ robj *newval = NULL;
+
+ sttime = ustime();
+ /* first ensure DB dict readonly copy exists */
+ cowLock();
+ if (server.cowSaveDbExt[db->id].dictArray == NULL) {
+ /* make clone with modified cow destructors for db dict */
+ server.cowSaveDbExt[db->id].dictArray = copyReadonly_dictobj(server.db[db->id].dict,
+ &server.cowSaveDbExt[db->id]);
+
+ /* migrate iterator */
+ roDBMigrateIterator(server.db[db->id].dict, server.cowSaveDbExt[db->id].dictArray);
+ }
+ cowUnlock();
+
+ if (val == NULL || key == NULL) {
+ return NULL;
+ }
+ if (dictFind(server.cowDictCopied, (sds)key->ptr) != NULL) {
+ /* already copied */
+ return val;
+ }
+
+ /* need to duplicate object, add key to cowDictCopied,
+ add original to deferred delete list, and update db entry */
+ cowLock();
+ switch (val->type) {
+ case REDIS_STRING:
+ /* updates always duplicate, original uses defered delete destructor */
+ break;
+ case REDIS_LIST:
+ newval = cowListCopy(val);
+ break;
+ case REDIS_SET:
+ newval = cowSetCopy(val);
+ break;
+ case REDIS_ZSET:
+ newval = cowZSetCopy(val);
+ break;
+ case REDIS_HASH:
+ newval = cowHashCopy(val);
+ break;
+ default:
+ break;
+ }
+ cowUnlock();
+
+ if (newval == NULL) {
+ /* no duplicate needed. return original */
+ return val;
+ }
+
+ /* add key to copied dictionary to avoid extra copies */
+ keyname = sdsdup((sds)key->ptr);
+ dictAdd(server.cowDictCopied, keyname, NULL);
+
+ /* rewitten iterators are added to converted for lookup
+ * during save. For other objects, replace DB entry */
+ if (newval->encoding == REDIS_ENCODING_HTARRAY ||
+ newval->encoding == REDIS_ENCODING_LINKEDLISTARRAY ||
+ newval->encoding == REDIS_ENCODING_HTZARRAY) {
+ cowLock();
+ /* add value to converted dictionary for iterator lookup */
+ dictAdd(server.cowDictConverted, keyname, newval);
+
+ /* migrate current iterator */
+ if (newval->encoding == REDIS_ENCODING_HTARRAY) {
+ roDictMigrateIterator((dict *)val->ptr, (cowDictArray *)newval->ptr);
+ } else if (newval->encoding == REDIS_ENCODING_LINKEDLISTARRAY) {
+ roListMigrateIterator((list *)val->ptr, (cowListArray *)newval->ptr);
+ } else if (newval->encoding == REDIS_ENCODING_HTZARRAY) {
+ roZDictMigrateIterator((dict *)val->ptr, (cowDictZArray *)newval->ptr);
+ }
+ cowUnlock();
+ } else {
+ /* replace active DB entry */
+ dbOverwrite(db, key, newval);
+ val = newval;
+ }
+
+ redisLog(REDIS_NOTICE, "elapsed COW time %d", (unsigned int)(ustime() - sttime));
+ return val;
+ }
+}
+
+
+/* copy a dictionary of redis objects
+ Assumes copied directory uses COW destructors */
+dict *copyonwrite_dictobj(dict *curdict, bkgdDbExt *extDict) {
+ dict *newdict;
+ dictIterator * di;
+ dictEntry *de;
+
+ /* checks if copy needed. else return curdict */
+ if (server.isBackgroundSaving == 0 || server.cowDictCopied == NULL) {
+ return curdict;
+ }
+
+ /* create copy */
+ newdict = dictCreate(curdict->type, curdict->privdata);
+ if (newdict != NULL) {
+ /* copy all entries without refcounting or copying values */
+ /* can't just memcpy the whole dictionary because entries are allocated */
+ di = dictGetSafeIterator(curdict);
+ while((de = dictNext(di)) != NULL) {
+ dictAdd(newdict, de->key, de->val);
+ }
+ dictReleaseIterator(di);
+
+ if (extDict != NULL) {
+ /* fix types to not delete while saving */
+ extDict->savedType = newdict->type;
+ newdict->type = extDict->cowType;
+ curdict->type = extDict->readonlyType;
+ }
+ }
+
+ return newdict;
+}
+
+
+void restore_dictobj(dict *curdict, bkgdDbExt *extDict) {
+ if (extDict != NULL && extDict->savedType != NULL) {
+ curdict->type = extDict->savedType;
+ extDict->savedType = NULL;
+ }
+}
+
+
+/* if copy on write active, then ensure there is a
+ copy of the value that is safe to modify or delete,
+ and update DB dict entry to refer to this value*/
+void cowEnsureExpiresCopy(redisDb *db) {
+ long long sttime;
+
+ if (server.isBackgroundSaving == 0 ||
+ server.cowDictCopied == NULL ||
+ server.cowSaveDb[db->id].expires == NULL) {
+ /* no copy needed */
+ return;
+ } else {
+ /* ensure DB expires is copied */
+ if (server.cowSaveDb[db->id].expires == server.db[db->id].expires) {
+ sttime = ustime();
+ server.db[db->id].expires = copyonwrite_dictobj(server.cowSaveDb[db->id].expires, NULL);
+ redisLog(REDIS_NOTICE, "elapsed COW DB expires time %d", (unsigned int)(ustime() - sttime));
+ }
+ }
+}
+
+/* global init function */
+void cowInit(void) {
+ int j;
+ redisLog(REDIS_NOTICE, "cowInit");
+ server.isBackgroundSaving = 0;
+ server.cowDictCopied = NULL;
+ server.cowDictConverted = NULL;
+ server.cowSaveDbExt = (bkgdDbExt *)zmalloc(sizeof(bkgdDbExt)*server.dbnum);
+ server.cowSaveDb = (redisDb *)zmalloc(sizeof(redisDb)*server.dbnum);
+
+ deferSdsDelete = listCreate();
+ deferObjDelete = listCreate();
+
+ for (j = 0; j < server.dbnum; j++) {
+ server.cowSaveDb[j].dict = NULL;
+ server.cowSaveDb[j].expires = NULL;
+ server.cowSaveDb[j].blocking_keys = NULL;
+ server.cowSaveDb[j].watched_keys = NULL;
+ server.cowSaveDb[j].id = j;
+ server.cowSaveDbExt[j].savedType = NULL;
+ server.cowSaveDbExt[j].cowType = &dbDeferDictType;
+ server.cowSaveDbExt[j].readonlyType = &dbDeferDictType;
+ server.cowSaveDbExt[j].dictArray = NULL;
+ server.cowSaveDbExt[j].id = j;
+ }
+
+ server.cowCurIters.curDbDictIter = NULL;
+ server.cowCurIters.curObjDictIter = NULL;
+ server.cowCurIters.curObjListIter = NULL;
+ server.cowCurIters.curObjZDictIter = NULL;
+ InitializeCriticalSectionAndSpinCount(&server.cowCurIters.csMigrate, 500);
+
+}
+
+/* release memory allocated for copy on write during background save */
+void cowBkgdSaveReset() {
+ int j;
+ listNode *ln;
+
+ if (server.cowDictCopied != NULL) {
+ for (j = 0; j < server.dbnum; j++) {
+ if (server.cowSaveDb[j].dict != NULL) {
+ /* restore normal dictionary destructors */
+ restore_dictobj(server.db[j].dict, &server.cowSaveDbExt[j]);
+ server.cowSaveDb[j].dict = NULL;
+ }
+
+ if (server.cowSaveDbExt[j].dictArray != NULL) {
+ cowReleaseDictArray(server.cowSaveDbExt[j].dictArray);
+ server.cowSaveDbExt[j].dictArray = NULL;
+ }
+
+ if (server.cowSaveDb[j].expires != NULL &&
+ server.cowSaveDb[j].expires != server.db[j].expires) {
+ dictRelease(server.cowSaveDb[j].expires);
+ server.cowSaveDb[j].expires = NULL;
+ }
+ }
+ }
+
+ server.cowCurIters.curDbDictIter = NULL;
+ server.cowCurIters.curObjDictIter = NULL;
+ server.cowCurIters.curObjZDictIter = NULL;
+ server.cowCurIters.curObjListIter = NULL;
+
+ /* cleanup table of copied items */
+ if (server.cowDictCopied != NULL) {
+ dictRelease(server.cowDictCopied);
+ server.cowDictCopied = NULL;
+ }
+
+ if (server.cowDictConverted != NULL) {
+ dictRelease(server.cowDictConverted);
+ server.cowDictConverted = NULL;
+ }
+
+ /* delete all deferred items */
+ redisLog(REDIS_NOTICE, "cowBkgdSaveReset deleting %d SDS and %d obj items",
+ listLength(deferSdsDelete), listLength(deferObjDelete));
+ while ( (ln = listFirst(deferSdsDelete)) != NULL) {
+ sdsfree((sds)(ln->value));
+ listDelNode(deferSdsDelete, ln);
+ }
+ while ( (ln = listFirst(deferObjDelete)) != NULL) {
+ if (ln->value != NULL) {
+ decrRefCount(ln->value);
+ }
+ listDelNode(deferObjDelete, ln);
+ }
+}
+
+/* requires sync with main thread */
+void cowBkgdSaveStart() {
+ int j;
+
+ cowBkgdSaveReset();
+ server.cowDictCopied = dictCreate(&ptrDictType, NULL);
+ server.cowDictConverted = dictCreate(&copiedCollectionDictType, NULL);
+ server.isBackgroundSaving = 1;
+ for (j = 0; j < server.dbnum; j++) {
+ /* copy dictionary references for saving */
+ server.cowSaveDb[j].dict = server.db[j].dict;
+ server.cowSaveDb[j].expires = server.db[j].expires;
+ server.cowSaveDb[j].blocking_keys = server.db[j].blocking_keys;
+ server.cowSaveDb[j].watched_keys = server.db[j].watched_keys;
+ }
+}
+
+/* requires sync with main thread */
+void cowBkgdSaveStop() {
+ server.isBackgroundSaving = 0;
+ cowBkgdSaveReset();
+}
+
+
+
+/* get converted object for saving */
+void *getRoConvertedObj(void *key, void *o) {
+ cowLock();
+ if (server.cowDictConverted != NULL) {
+ dictEntry *de;
+ de = dictFind(server.cowDictConverted, key);
+ if (de != NULL) {
+ o = de->val;
+ }
+ }
+ cowUnlock();
+ return o;
+}
+
+
+/* Iterators for saving */
+
+int roDBDictSize(int id) {
+ if (server.isBackgroundSaving != 0) {
+ if (server.cowSaveDbExt[id].dictArray != NULL) {
+ return server.cowSaveDbExt[id].dictArray->numele;
+ }
+ }
+ return dictSize(server.db[id].dict);
+}
+
+/* iterator for DB dictionary */
+roDictIter *roDBGetIterator(int id) {
+ roDictIter *iter;
+ iter = (roDictIter *)zmalloc(sizeof(roDictIter));
+
+ cowLock();
+ iter->di = dictGetSafeIterator(server.db[id].dict);
+ iter->hdict = server.db[id].dict;
+ iter->ar = NULL;
+ iter->pos = 0;
+
+ if (server.isBackgroundSaving != 0) {
+ if (server.cowSaveDbExt[id].dictArray != NULL) {
+ iter->ar = server.cowSaveDbExt[id].dictArray;
+ }
+ server.cowCurIters.curDbDictIter = iter;
+ }
+ cowUnlock();
+ return iter;
+}
+
+/* iterator for hash dictionary (not DB) */
+roDictIter *roDictGetIterator(dict *d, cowDictArray *ro) {
+ roDictIter *iter;
+ iter = (roDictIter *)zmalloc(sizeof(roDictIter));
+
+ cowLock();
+ if (d != NULL) {
+ iter->di = dictGetIterator(d);
+ } else {
+ iter->di = NULL;
+ }
+ iter->hdict = d;
+ iter->ar = ro;
+ iter->pos = 0;
+ if (server.isBackgroundSaving != 0) {
+ server.cowCurIters.curObjDictIter = iter;
+ }
+ cowUnlock();
+
+ return iter;
+}
+
+dictEntry *roDictNext(roDictIter *iter) {
+ dictEntry *de = NULL;
+
+ cowLock();
+ if (iter->ar != NULL) {
+ if (iter->pos >= 0 && iter->pos < iter->ar->numele) {
+ de = &iter->ar->de[iter->pos];
+ iter->pos++;
+ }
+ } else if (iter->di != NULL) {
+ de = dictNext(iter->di);
+ iter->pos++;
+ }
+ if (de == NULL) {
+ iter->pos = -1;
+ }
+ cowUnlock();
+
+ return de;
+}
+
+void roDictReleaseIterator(roDictIter *iter) {
+ server.cowCurIters.curObjDictIter = NULL;
+ if (iter->di != NULL) {
+ dictReleaseIterator(iter->di);
+ }
+ zfree(iter);
+}
+
+
+/* iterator for zset hash dictionary */
+roZDictIter *roZDictGetIterator(dict *d, cowDictZArray *ro) {
+ roZDictIter *iter;
+ iter = (roZDictIter *)zmalloc(sizeof(roZDictIter));
+
+ cowLock();
+ if (d != NULL) {
+ iter->di = dictGetIterator(d);