Permalink
Browse files

Nasty bug of the new DB format fixed, objects sharing implemented

  • Loading branch information...
1 parent d9f650b commit 10c43610dee70032db40008996790bdeb54632b6 @antirez committed Mar 25, 2009
Showing with 95 additions and 31 deletions.
  1. +1 −0 TODO
  2. +88 −31 redis.c
  3. +6 −0 redis.conf
View
1 TODO
@@ -8,5 +8,6 @@
- check 'server.dirty' everywere
- replication automated tests
- a command, or an external tool, to perform the MD5SUM of the whole dataset, so that if the dataset between two servers is identical, so will be the MD5SUM
+- objects sharing, "objectsharing yes", "objectsharingpool 1024"
* Include Lua and Perl bindings
View
119 redis.c
@@ -95,10 +95,10 @@
* 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
* 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
* 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
- * 11|000000 [64 bit integer] => if it's 11, a full 64 bit len will follow
+ * 11|000000 reserved for future uses
*
- * 64 bit lengths are not used currently. Lenghts up to 63 are stored using
- * a single byte, most DB keys, and may values, will fit inside. */
+ * Lenghts up to 63 are stored using a single byte, most DB keys, and may
+ * values, will fit inside. */
#define REDIS_RDB_6BITLEN 0
#define REDIS_RDB_14BITLEN 1
#define REDIS_RDB_32BITLEN 2
@@ -173,6 +173,8 @@ struct redisServer {
int port;
int fd;
dict **dict;
+ dict *sharingpool;
+ unsigned int sharingpoolsize;
long long dirty; /* changes to DB from the last save */
list *clients;
list *slaves, *monitors;
@@ -199,6 +201,7 @@ struct redisServer {
char *logfile;
char *bindaddr;
char *dbfilename;
+ int shareobjects;
/* Replication related */
int isslave;
char *masterhost;
@@ -258,6 +261,7 @@ static int rdbSaveBackground(char *filename);
static robj *createStringObject(char *ptr, size_t len);
static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
static int syncWithMaster(void);
+static robj *tryObjectSharing(robj *o);
static void pingCommand(redisClient *c);
static void echoCommand(redisClient *c);
@@ -627,7 +631,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %d bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
- server.usedmemory);
+ server.usedmemory,
+ dictGetHashTableUsed(server.sharingpool));
}
/* Close connections of timedout clients */
@@ -739,6 +744,7 @@ static void initServerConfig() {
server.daemonize = 0;
server.pidfile = "/var/run/redis.pid";
server.dbfilename = "dump.rdb";
+ server.shareobjects = 0;
ResetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
@@ -765,18 +771,17 @@ static void initServer() {
createSharedObjects();
server.el = aeCreateEventLoop();
server.dict = zmalloc(sizeof(dict*)*server.dbnum);
+ server.sharingpool = dictCreate(&setDictType,NULL);
+ server.sharingpoolsize = 1024;
if (!server.dict || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
oom("server initialization"); /* Fatal OOM */
server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
if (server.fd == -1) {
redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
exit(1);
}
- for (j = 0; j < server.dbnum; j++) {
+ for (j = 0; j < server.dbnum; j++)
server.dict[j] = dictCreate(&hashDictType,NULL);
- if (!server.dict[j])
- oom("dictCreate"); /* Fatal OOM */
- }
server.cronloops = 0;
server.bgsaveinprogress = 0;
server.lastsave = time(NULL);
@@ -895,6 +900,13 @@ static void loadServerConfig(char *filename) {
else {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
+ } else if (!strcmp(argv[0],"shareobjects") && argc == 2) {
+ sdstolower(argv[1]);
+ if (!strcmp(argv[1],"yes")) server.shareobjects = 1;
+ else if (!strcmp(argv[1],"no")) server.shareobjects = 0;
+ else {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
} else if (!strcmp(argv[0],"daemonize") && argc == 2) {
sdstolower(argv[1]);
if (!strcmp(argv[1],"yes")) server.daemonize = 1;
@@ -1102,6 +1114,12 @@ static int processCommand(redisClient *c) {
return 1;
}
}
+ /* Let's try to share objects on the command arguments vector */
+ if (server.shareobjects) {
+ int j;
+ for(j = 1; j < c->argc; j++)
+ c->argv[j] = tryObjectSharing(c->argv[j]);
+ }
/* Exec the command */
dirty = server.dirty;
cmd->proc(c);
@@ -1412,6 +1430,51 @@ static void decrRefCount(void *obj) {
}
}
+/* Try to share an object against the shared objects pool */
+static robj *tryObjectSharing(robj *o) {
+ struct dictEntry *de;
+ unsigned long c;
+
+ if (server.shareobjects == 0) return o;
+
+ assert(o->type == REDIS_STRING);
+ de = dictFind(server.sharingpool,o);
+ if (de) {
+ robj *shared = dictGetEntryKey(de);
+
+ c = ((unsigned long) dictGetEntryVal(de))+1;
+ dictGetEntryVal(de) = (void*) c;
+ incrRefCount(shared);
+ decrRefCount(o);
+ return shared;
+ } else {
+ /* Here we are using a stream algorihtm: Every time an object is
+ * shared we increment its count, everytime there is a miss we
+ * recrement the counter of a random object. If this object reaches
+ * zero we remove the object and put the current object instead. */
+ if (dictGetHashTableUsed(server.sharingpool) >=
+ server.sharingpoolsize) {
+ de = dictGetRandomKey(server.sharingpool);
+ assert(de != NULL);
+ c = ((unsigned long) dictGetEntryVal(de))-1;
+ dictGetEntryVal(de) = (void*) c;
+ if (c == 0) {
+ dictDelete(server.sharingpool,de->key);
+ }
+ } else {
+ c = 0; /* If the pool is empty we want to add this object */
+ }
+ if (c == 0) {
+ int retval;
+
+ retval = dictAdd(server.sharingpool,o,(void*)1);
+ assert(retval == DICT_OK);
+ incrRefCount(o);
+ }
+ return o;
+ }
+}
+
/*============================ DB saving/loading ============================ */
static int rdbSaveType(FILE *fp, unsigned char type) {
@@ -1424,23 +1487,31 @@ static int rdbSaveLen(FILE *fp, uint32_t len) {
if (len < (1<<6)) {
/* Save a 6 bit len */
- buf[0] = (len&0xFF)|REDIS_RDB_6BITLEN;
+ buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
if (fwrite(buf,1,1,fp) == 0) return -1;
} else if (len < (1<<14)) {
/* Save a 14 bit len */
- buf[0] = ((len>>8)&0xFF)|REDIS_RDB_14BITLEN;
+ buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
buf[1] = len&0xFF;
if (fwrite(buf,4,1,fp) == 0) return -1;
} else {
/* Save a 32 bit len */
- buf[0] = REDIS_RDB_32BITLEN;
+ buf[0] = (REDIS_RDB_32BITLEN<<6);
if (fwrite(buf,1,1,fp) == 0) return -1;
len = htonl(len);
if (fwrite(&len,4,1,fp) == 0) return -1;
}
return 0;
}
+static int rdbSaveStringObject(FILE *fp, robj *obj) {
+ size_t len = sdslen(obj->ptr);
+
+ if (rdbSaveLen(fp,len) == -1) return -1;
+ if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
+ return 0;
+}
+
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
static int rdbSave(char *filename) {
dictIterator *di = NULL;
@@ -1475,15 +1546,10 @@ static int rdbSave(char *filename) {
robj *o = dictGetEntryVal(de);
if (rdbSaveType(fp,o->type) == -1) goto werr;
- if (rdbSaveLen(fp,sdslen(key->ptr)) == -1) goto werr;
- if (fwrite(key->ptr,sdslen(key->ptr),1,fp) == 0) goto werr;
+ if (rdbSaveStringObject(fp,key) == -1) goto werr;
if (o->type == REDIS_STRING) {
/* Save a string value */
- sds sval = o->ptr;
-
- if (rdbSaveLen(fp,sdslen(sval)) == -1) goto werr;
- if (sdslen(sval) &&
- fwrite(sval,sdslen(sval),1,fp) == 0) goto werr;
+ if (rdbSaveStringObject(fp,o) == -1) goto werr;
} else if (o->type == REDIS_LIST) {
/* Save a list value */
list *list = o->ptr;
@@ -1493,10 +1559,7 @@ static int rdbSave(char *filename) {
while(ln) {
robj *eleobj = listNodeValue(ln);
- if (rdbSaveLen(fp,sdslen(eleobj->ptr)) == -1) goto werr;
- if (sdslen(eleobj->ptr) &&
- fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0)
- goto werr;
+ if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ln = ln->next;
}
} else if (o->type == REDIS_SET) {
@@ -1508,13 +1571,9 @@ static int rdbSave(char *filename) {
if (!set) oom("dictGetIteraotr");
if (rdbSaveLen(fp,dictGetHashTableUsed(set)) == -1) goto werr;
while((de = dictNext(di)) != NULL) {
- robj *eleobj;
+ robj *eleobj = dictGetEntryKey(de);
- eleobj = dictGetEntryKey(de);
- if (rdbSaveLen(fp,sdslen(eleobj->ptr)) == -1) goto werr;
- if (sdslen(eleobj->ptr) &&
- fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0)
- goto werr;
+ if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
}
dictReleaseIterator(di);
} else {
@@ -1600,7 +1659,6 @@ static uint32_t rdbLoadLen(FILE *fp, int rdbver) {
return ntohl(len);
}
}
- return 0;
}
static robj *rdbLoadStringObject(FILE*fp,int rdbver) {
@@ -1613,7 +1671,7 @@ static robj *rdbLoadStringObject(FILE*fp,int rdbver) {
sdsfree(val);
return NULL;
}
- return createObject(REDIS_STRING,val);
+ return tryObjectSharing(createObject(REDIS_STRING,val));
}
static int rdbLoad(char *filename) {
@@ -1625,7 +1683,6 @@ static int rdbLoad(char *filename) {
dict *d = server.dict[0];
char buf[1024];
int rdbver;
-
fp = fopen(filename,"r");
if (!fp) return REDIS_ERR;
if (fread(buf,9,1,fp) == 0) goto eoferr;
View
@@ -68,3 +68,9 @@ databases 16
# single TCP packet. Uses a bit more CPU but most of the times it is a win
# in terms of number of queries per second. Use 'yes' if unsure.
glueoutputbuf yes
+
+# Use object sharing. Can save a lot of memory if you have many common
+# string in your dataset, but performs lookups against the shared objects
+# pool so it uses more CPU and can be a bit slower. Usually it's a good
+# idea.
+shareobjects no

0 comments on commit 10c4361

Please sign in to comment.