Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

redis.c split into many different C files.

networking related stuff moved into networking.c

moved more code

more work on layout of source code

SDS instantaneuos memory saving. By Pieter and Salvatore at VMware ;)

cleanly compiling again after the first split, now splitting it in more C files

moving more things around... work in progress

split replication code

splitting more

Sets split

Hash split

replication split

even more splitting

more splitting

minor change
  • Loading branch information...
commit e2641e09cc0daf44f63f654230f72d22acf3a9af 1 parent c2ff0e9
@antirez authored
Showing with 11,811 additions and 12,084 deletions.
  1. +0 −11,621 redis.c
  2. +0 −75 redis.h
  3. +3 −6 { → src}/Makefile
  4. 0  { → src}/adlist.c
  5. 0  { → src}/adlist.h
  6. 0  { → src}/ae.c
  7. 0  { → src}/ae.h
  8. 0  { → src}/ae_epoll.c
  9. 0  { → src}/ae_kqueue.c
  10. 0  { → src}/ae_select.c
  11. 0  { → src}/anet.c
  12. 0  { → src}/anet.h
  13. +694 −0 src/aof.c
  14. +438 −0 src/config.c
  15. 0  { → src}/config.h
  16. +508 −0 src/db.c
  17. +309 −0 src/debug.c
  18. 0  { → src}/dict.c
  19. 0  { → src}/dict.h
  20. 0  { → src}/fmacros.h
  21. 0  { → src}/linenoise.c
  22. 0  { → src}/linenoise.h
  23. 0  { → src}/lzf.h
  24. 0  { → src}/lzfP.h
  25. 0  { → src}/lzf_c.c
  26. 0  { → src}/lzf_d.c
  27. 0  { → src}/mkreleasehdr.sh
  28. +266 −0 src/multi.c
  29. +589 −0 src/networking.c
  30. +405 −0 src/object.c
  31. 0  { → src}/pqsort.c
  32. 0  { → src}/pqsort.h
  33. +259 −0 src/pubsub.c
  34. +886 −0 src/rdb.c
  35. 0  { → src}/redis-benchmark.c
  36. 0  { → src}/redis-check-aof.c
  37. 0  { → src}/redis-check-dump.c
  38. 0  { → src}/redis-cli.c
  39. +1,516 −0 src/redis.c
  40. +885 −0 src/redis.h
  41. 0  { → src}/release.c
  42. +475 −0 src/replication.c
  43. +26 −1 { → src}/sds.c
  44. +4 −3 { → src}/sds.h
  45. 0  { → src}/sha1.c
  46. 0  { → src}/sha1.h
  47. 0  { → src}/solarisfixes.h
  48. +383 −0 src/sort.c
  49. +397 −0 src/t_hash.c
  50. +829 −0 src/t_list.c
  51. +349 −0 src/t_set.c
  52. +251 −0 src/t_string.c
  53. +985 −0 src/t_zset.c
  54. +223 −0 src/util.c
  55. +1 −0  src/version.h
  56. +1,126 −0 src/vm.c
  57. 0  { → src}/ziplist.c
  58. 0  { → src}/ziplist.h
  59. 0  { → src}/zipmap.c
  60. 0  { → src}/zipmap.h
  61. 0  { → src}/zmalloc.c
  62. 0  { → src}/zmalloc.h
  63. +0 −374 staticsymbols.h
  64. +2 −2 tests/integration/aof.tcl
  65. +2 −2 tests/support/server.tcl
View
11,621 redis.c
0 additions, 11,621 deletions not shown
View
75 redis.h
@@ -1,75 +0,0 @@
-/*
- * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * * Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * * Neither the name of Redis nor the names of its contributors may be used
- * to endorse or promote products derived from this software without
- * specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#ifndef __REDIS_H__
-#define __REDIS_H__
-
-enum
-{
- REG_GS = 0,
-# define REG_GS REG_GS
- REG_FS,
-# define REG_FS REG_FS
- REG_ES,
-# define REG_ES REG_ES
- REG_DS,
-# define REG_DS REG_DS
- REG_EDI,
-# define REG_EDI REG_EDI
- REG_ESI,
-# define REG_ESI REG_ESI
- REG_EBP,
-# define REG_EBP REG_EBP
- REG_ESP,
-# define REG_ESP REG_ESP
- REG_EBX,
-# define REG_EBX REG_EBX
- REG_EDX,
-# define REG_EDX REG_EDX
- REG_ECX,
-# define REG_ECX REG_ECX
- REG_EAX,
-# define REG_EAX REG_EAX
- REG_TRAPNO,
-# define REG_TRAPNO REG_TRAPNO
- REG_ERR,
-# define REG_ERR REG_ERR
- REG_EIP,
-# define REG_EIP REG_EIP
- REG_CS,
-# define REG_CS REG_CS
- REG_EFL,
-# define REG_EFL REG_EFL
- REG_UESP,
-# define REG_UESP REG_UESP
- REG_SS
-# define REG_SS REG_SS
-};
-
-#endif
View
9 Makefile → src/Makefile
@@ -15,7 +15,7 @@ endif
CCOPT= $(CFLAGS) $(CCLINK) $(ARCH) $(PROF)
DEBUG?= -g -rdynamic -ggdb
-OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o
+OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o vm.o pubsub.o multi.o debug.o sort.o
BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o
CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o linenoise.o
CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o
@@ -48,7 +48,7 @@ redis-check-dump.o: redis-check-dump.c lzf.h
redis-cli.o: redis-cli.c fmacros.h anet.h sds.h adlist.h zmalloc.h \
linenoise.h
redis.o: redis.c fmacros.h config.h redis.h ae.h sds.h anet.h dict.h \
- adlist.h zmalloc.h lzf.h pqsort.h zipmap.h ziplist.h sha1.h staticsymbols.h
+ adlist.h zmalloc.h lzf.h pqsort.h zipmap.h ziplist.h sha1.h
release.o: release.c release.h
sds.o: sds.c sds.h zmalloc.h
sha1.o: sha1.c sha1.h
@@ -83,11 +83,8 @@ clean:
dep:
$(CC) -MM *.c
-staticsymbols:
- tclsh utils/build-static-symbols.tcl > staticsymbols.h
-
test:
- tclsh8.5 tests/test_helper.tcl --tags "${TAGS}"
+ (cd ..; tclsh8.5 tests/test_helper.tcl --tags "${TAGS}")
bench:
./redis-benchmark
View
0  adlist.c → src/adlist.c
File renamed without changes
View
0  adlist.h → src/adlist.h
File renamed without changes
View
0  ae.c → src/ae.c
File renamed without changes
View
0  ae.h → src/ae.h
File renamed without changes
View
0  ae_epoll.c → src/ae_epoll.c
File renamed without changes
View
0  ae_kqueue.c → src/ae_kqueue.c
File renamed without changes
View
0  ae_select.c → src/ae_select.c
File renamed without changes
View
0  anet.c → src/anet.c
File renamed without changes
View
0  anet.h → src/anet.h
File renamed without changes
View
694 src/aof.c
@@ -0,0 +1,694 @@
+#include "redis.h"
+
+#include <signal.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+
+/* Called when the user switches from "appendonly yes" to "appendonly no"
+ * at runtime using the CONFIG command. */
+void stopAppendOnly(void) {
+ flushAppendOnlyFile();
+ aof_fsync(server.appendfd);
+ close(server.appendfd);
+
+ server.appendfd = -1;
+ server.appendseldb = -1;
+ server.appendonly = 0;
+ /* rewrite operation in progress? kill it, wait child exit */
+ if (server.bgsavechildpid != -1) {
+ int statloc;
+
+ if (kill(server.bgsavechildpid,SIGKILL) != -1)
+ wait3(&statloc,0,NULL);
+ /* reset the buffer accumulating changes while the child saves */
+ sdsfree(server.bgrewritebuf);
+ server.bgrewritebuf = sdsempty();
+ server.bgsavechildpid = -1;
+ }
+}
+
+/* Called when the user switches from "appendonly no" to "appendonly yes"
+ * at runtime using the CONFIG command. */
+int startAppendOnly(void) {
+ server.appendonly = 1;
+ server.lastfsync = time(NULL);
+ server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
+ if (server.appendfd == -1) {
+ redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno));
+ return REDIS_ERR;
+ }
+ if (rewriteAppendOnlyFileBackground() == REDIS_ERR) {
+ server.appendonly = 0;
+ close(server.appendfd);
+ redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno));
+ return REDIS_ERR;
+ }
+ return REDIS_OK;
+}
+
+/* Write the append only file buffer on disk.
+ *
+ * Since we are required to write the AOF before replying to the client,
+ * and the only way the client socket can get a write is entering when the
+ * the event loop, we accumulate all the AOF writes in a memory
+ * buffer and write it on disk using this function just before entering
+ * the event loop again. */
+void flushAppendOnlyFile(void) {
+ time_t now;
+ ssize_t nwritten;
+
+ if (sdslen(server.aofbuf) == 0) return;
+
+ /* We want to perform a single write. This should be guaranteed atomic
+ * at least if the filesystem we are writing is a real physical one.
+ * While this will save us against the server being killed I don't think
+ * there is much to do about the whole server stopping for power problems
+ * or alike */
+ nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf));
+ if (nwritten != (signed)sdslen(server.aofbuf)) {
+ /* Ooops, we are in troubles. The best thing to do for now is
+ * aborting instead of giving the illusion that everything is
+ * working as expected. */
+ if (nwritten == -1) {
+ redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
+ } else {
+ redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
+ }
+ exit(1);
+ }
+ sdsfree(server.aofbuf);
+ server.aofbuf = sdsempty();
+
+ /* Don't Fsync if no-appendfsync-on-rewrite is set to yes and we have
+ * childs performing heavy I/O on disk. */
+ if (server.no_appendfsync_on_rewrite &&
+ (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1))
+ return;
+ /* Fsync if needed */
+ now = time(NULL);
+ if (server.appendfsync == APPENDFSYNC_ALWAYS ||
+ (server.appendfsync == APPENDFSYNC_EVERYSEC &&
+ now-server.lastfsync > 1))
+ {
+ /* aof_fsync is defined as fdatasync() for Linux in order to avoid
+ * flushing metadata. */
+ aof_fsync(server.appendfd); /* Let's try to get this data on the disk */
+ server.lastfsync = now;
+ }
+}
+
+sds catAppendOnlyGenericCommand(sds buf, int argc, robj **argv) {
+ int j;
+ buf = sdscatprintf(buf,"*%d\r\n",argc);
+ for (j = 0; j < argc; j++) {
+ robj *o = getDecodedObject(argv[j]);
+ buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
+ buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
+ buf = sdscatlen(buf,"\r\n",2);
+ decrRefCount(o);
+ }
+ return buf;
+}
+
+sds catAppendOnlyExpireAtCommand(sds buf, robj *key, robj *seconds) {
+ int argc = 3;
+ long when;
+ robj *argv[3];
+
+ /* Make sure we can use strtol */
+ seconds = getDecodedObject(seconds);
+ when = time(NULL)+strtol(seconds->ptr,NULL,10);
+ decrRefCount(seconds);
+
+ argv[0] = createStringObject("EXPIREAT",8);
+ argv[1] = key;
+ argv[2] = createObject(REDIS_STRING,
+ sdscatprintf(sdsempty(),"%ld",when));
+ buf = catAppendOnlyGenericCommand(buf, argc, argv);
+ decrRefCount(argv[0]);
+ decrRefCount(argv[2]);
+ return buf;
+}
+
+void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
+ sds buf = sdsempty();
+ robj *tmpargv[3];
+
+ /* The DB this command was targetting is not the same as the last command
+ * we appendend. To issue a SELECT command is needed. */
+ if (dictid != server.appendseldb) {
+ char seldb[64];
+
+ snprintf(seldb,sizeof(seldb),"%d",dictid);
+ buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
+ (unsigned long)strlen(seldb),seldb);
+ server.appendseldb = dictid;
+ }
+
+ if (cmd->proc == expireCommand) {
+ /* Translate EXPIRE into EXPIREAT */
+ buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]);
+ } else if (cmd->proc == setexCommand) {
+ /* Translate SETEX to SET and EXPIREAT */
+ tmpargv[0] = createStringObject("SET",3);
+ tmpargv[1] = argv[1];
+ tmpargv[2] = argv[3];
+ buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
+ decrRefCount(tmpargv[0]);
+ buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]);
+ } else {
+ buf = catAppendOnlyGenericCommand(buf,argc,argv);
+ }
+
+ /* Append to the AOF buffer. This will be flushed on disk just before
+ * of re-entering the event loop, so before the client will get a
+ * positive reply about the operation performed. */
+ server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf));
+
+ /* If a background append only file rewriting is in progress we want to
+ * accumulate the differences between the child DB and the current one
+ * in a buffer, so that when the child process will do its work we
+ * can append the differences to the new append only file. */
+ if (server.bgrewritechildpid != -1)
+ server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
+
+ sdsfree(buf);
+}
+
+/* In Redis commands are always executed in the context of a client, so in
+ * order to load the append only file we need to create a fake client. */
+struct redisClient *createFakeClient(void) {
+ struct redisClient *c = zmalloc(sizeof(*c));
+
+ selectDb(c,0);
+ c->fd = -1;
+ c->querybuf = sdsempty();
+ c->argc = 0;
+ c->argv = NULL;
+ c->flags = 0;
+ /* We set the fake client as a slave waiting for the synchronization
+ * so that Redis will not try to send replies to this client. */
+ c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
+ c->reply = listCreate();
+ listSetFreeMethod(c->reply,decrRefCount);
+ listSetDupMethod(c->reply,dupClientReplyValue);
+ initClientMultiState(c);
+ return c;
+}
+
+void freeFakeClient(struct redisClient *c) {
+ sdsfree(c->querybuf);
+ listRelease(c->reply);
+ freeClientMultiState(c);
+ zfree(c);
+}
+
+/* Replay the append log file. On error REDIS_OK is returned. On non fatal
+ * error (the append only file is zero-length) REDIS_ERR is returned. On
+ * fatal error an error message is logged and the program exists. */
+int loadAppendOnlyFile(char *filename) {
+ struct redisClient *fakeClient;
+ FILE *fp = fopen(filename,"r");
+ struct redis_stat sb;
+ int appendonly = server.appendonly;
+
+ if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
+ return REDIS_ERR;
+
+ if (fp == NULL) {
+ redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
+ exit(1);
+ }
+
+ /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
+ * to the same file we're about to read. */
+ server.appendonly = 0;
+
+ fakeClient = createFakeClient();
+ while(1) {
+ int argc, j;
+ unsigned long len;
+ robj **argv;
+ char buf[128];
+ sds argsds;
+ struct redisCommand *cmd;
+ int force_swapout;
+
+ if (fgets(buf,sizeof(buf),fp) == NULL) {
+ if (feof(fp))
+ break;
+ else
+ goto readerr;
+ }
+ if (buf[0] != '*') goto fmterr;
+ argc = atoi(buf+1);
+ argv = zmalloc(sizeof(robj*)*argc);
+ for (j = 0; j < argc; j++) {
+ if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
+ if (buf[0] != '$') goto fmterr;
+ len = strtol(buf+1,NULL,10);
+ argsds = sdsnewlen(NULL,len);
+ if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
+ argv[j] = createObject(REDIS_STRING,argsds);
+ if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
+ }
+
+ /* Command lookup */
+ cmd = lookupCommand(argv[0]->ptr);
+ if (!cmd) {
+ redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
+ exit(1);
+ }
+ /* Try object encoding */
+ if (cmd->flags & REDIS_CMD_BULK)
+ argv[argc-1] = tryObjectEncoding(argv[argc-1]);
+ /* Run the command in the context of a fake client */
+ fakeClient->argc = argc;
+ fakeClient->argv = argv;
+ cmd->proc(fakeClient);
+ /* Discard the reply objects list from the fake client */
+ while(listLength(fakeClient->reply))
+ listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
+ /* Clean up, ready for the next command */
+ for (j = 0; j < argc; j++) decrRefCount(argv[j]);
+ zfree(argv);
+ /* Handle swapping while loading big datasets when VM is on */
+ force_swapout = 0;
+ if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
+ force_swapout = 1;
+
+ if (server.vm_enabled && force_swapout) {
+ while (zmalloc_used_memory() > server.vm_max_memory) {
+ if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
+ }
+ }
+ }
+
+ /* This point can only be reached when EOF is reached without errors.
+ * If the client is in the middle of a MULTI/EXEC, log error and quit. */
+ if (fakeClient->flags & REDIS_MULTI) goto readerr;
+
+ fclose(fp);
+ freeFakeClient(fakeClient);
+ server.appendonly = appendonly;
+ return REDIS_OK;
+
+readerr:
+ if (feof(fp)) {
+ redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
+ } else {
+ redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
+ }
+ exit(1);
+fmterr:
+ redisLog(REDIS_WARNING,"Bad file format reading the append only file");
+ exit(1);
+}
+
+/* Write binary-safe string into a file in the bulkformat
+ * $<count>\r\n<payload>\r\n */
+int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
+ char cbuf[128];
+ int clen;
+ cbuf[0] = '$';
+ clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len);
+ cbuf[clen++] = '\r';
+ cbuf[clen++] = '\n';
+ if (fwrite(cbuf,clen,1,fp) == 0) return 0;
+ if (len > 0 && fwrite(s,len,1,fp) == 0) return 0;
+ if (fwrite("\r\n",2,1,fp) == 0) return 0;
+ return 1;
+}
+
+/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
+int fwriteBulkDouble(FILE *fp, double d) {
+ char buf[128], dbuf[128];
+
+ snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
+ snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
+ if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
+ if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
+ return 1;
+}
+
+/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
+int fwriteBulkLongLong(FILE *fp, long long l) {
+ char bbuf[128], lbuf[128];
+ unsigned int blen, llen;
+ llen = ll2string(lbuf,32,l);
+ blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf);
+ if (fwrite(bbuf,blen,1,fp) == 0) return 0;
+ return 1;
+}
+
+/* Delegate writing an object to writing a bulk string or bulk long long. */
+int fwriteBulkObject(FILE *fp, robj *obj) {
+ /* Avoid using getDecodedObject to help copy-on-write (we are often
+ * in a child process when this function is called). */
+ if (obj->encoding == REDIS_ENCODING_INT) {
+ return fwriteBulkLongLong(fp,(long)obj->ptr);
+ } else if (obj->encoding == REDIS_ENCODING_RAW) {
+ return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr));
+ } else {
+ redisPanic("Unknown string encoding");
+ }
+}
+
+/* Write a sequence of commands able to fully rebuild the dataset into
+ * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
+int rewriteAppendOnlyFile(char *filename) {
+ dictIterator *di = NULL;
+ dictEntry *de;
+ FILE *fp;
+ char tmpfile[256];
+ int j;
+ time_t now = time(NULL);
+
+ /* Note that we have to use a different temp name here compared to the
+ * one used by rewriteAppendOnlyFileBackground() function. */
+ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
+ fp = fopen(tmpfile,"w");
+ if (!fp) {
+ redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
+ return REDIS_ERR;
+ }
+ for (j = 0; j < server.dbnum; j++) {
+ char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
+ redisDb *db = server.db+j;
+ dict *d = db->dict;
+ if (dictSize(d) == 0) continue;
+ di = dictGetIterator(d);
+ if (!di) {
+ fclose(fp);
+ return REDIS_ERR;
+ }
+
+ /* SELECT the new DB */
+ if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkLongLong(fp,j) == 0) goto werr;
+
+ /* Iterate this DB writing every entry */
+ while((de = dictNext(di)) != NULL) {
+ sds keystr = dictGetEntryKey(de);
+ robj key, *o;
+ time_t expiretime;
+ int swapped;
+
+ keystr = dictGetEntryKey(de);
+ o = dictGetEntryVal(de);
+ initStaticStringObject(key,keystr);
+ /* If the value for this key is swapped, load a preview in memory.
+ * We use a "swapped" flag to remember if we need to free the
+ * value object instead to just increment the ref count anyway
+ * in order to avoid copy-on-write of pages if we are forked() */
+ if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
+ o->storage == REDIS_VM_SWAPPING) {
+ swapped = 0;
+ } else {
+ o = vmPreviewObject(o);
+ swapped = 1;
+ }
+ expiretime = getExpire(db,&key);
+
+ /* Save the key and associated value */
+ if (o->type == REDIS_STRING) {
+ /* Emit a SET command */
+ char cmd[]="*3\r\n$3\r\nSET\r\n";
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ /* Key and value */
+ if (fwriteBulkObject(fp,&key) == 0) goto werr;
+ if (fwriteBulkObject(fp,o) == 0) goto werr;
+ } else if (o->type == REDIS_LIST) {
+ /* Emit the RPUSHes needed to rebuild the list */
+ char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
+ if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+ unsigned char *zl = o->ptr;
+ unsigned char *p = ziplistIndex(zl,0);
+ unsigned char *vstr;
+ unsigned int vlen;
+ long long vlong;
+
+ while(ziplistGet(p,&vstr,&vlen,&vlong)) {
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,&key) == 0) goto werr;
+ if (vstr) {
+ if (fwriteBulkString(fp,(char*)vstr,vlen) == 0)
+ goto werr;
+ } else {
+ if (fwriteBulkLongLong(fp,vlong) == 0)
+ goto werr;
+ }
+ p = ziplistNext(zl,p);
+ }
+ } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
+ list *list = o->ptr;
+ listNode *ln;
+ listIter li;
+
+ listRewind(list,&li);
+ while((ln = listNext(&li))) {
+ robj *eleobj = listNodeValue(ln);
+
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,&key) == 0) goto werr;
+ if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+ }
+ } else {
+ redisPanic("Unknown list encoding");
+ }
+ } else if (o->type == REDIS_SET) {
+ /* Emit the SADDs needed to rebuild the set */
+ dict *set = o->ptr;
+ dictIterator *di = dictGetIterator(set);
+ dictEntry *de;
+
+ while((de = dictNext(di)) != NULL) {
+ char cmd[]="*3\r\n$4\r\nSADD\r\n";
+ robj *eleobj = dictGetEntryKey(de);
+
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,&key) == 0) goto werr;
+ if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+ }
+ dictReleaseIterator(di);
+ } else if (o->type == REDIS_ZSET) {
+ /* Emit the ZADDs needed to rebuild the sorted set */
+ zset *zs = o->ptr;
+ dictIterator *di = dictGetIterator(zs->dict);
+ dictEntry *de;
+
+ while((de = dictNext(di)) != NULL) {
+ char cmd[]="*4\r\n$4\r\nZADD\r\n";
+ robj *eleobj = dictGetEntryKey(de);
+ double *score = dictGetEntryVal(de);
+
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,&key) == 0) goto werr;
+ if (fwriteBulkDouble(fp,*score) == 0) goto werr;
+ if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+ }
+ dictReleaseIterator(di);
+ } else if (o->type == REDIS_HASH) {
+ char cmd[]="*4\r\n$4\r\nHSET\r\n";
+
+ /* Emit the HSETs needed to rebuild the hash */
+ if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+ unsigned char *p = zipmapRewind(o->ptr);
+ unsigned char *field, *val;
+ unsigned int flen, vlen;
+
+ while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,&key) == 0) goto werr;
+ if (fwriteBulkString(fp,(char*)field,flen) == -1)
+ return -1;
+ if (fwriteBulkString(fp,(char*)val,vlen) == -1)
+ return -1;
+ }
+ } else {
+ dictIterator *di = dictGetIterator(o->ptr);
+ dictEntry *de;
+
+ while((de = dictNext(di)) != NULL) {
+ robj *field = dictGetEntryKey(de);
+ robj *val = dictGetEntryVal(de);
+
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,&key) == 0) goto werr;
+ if (fwriteBulkObject(fp,field) == -1) return -1;
+ if (fwriteBulkObject(fp,val) == -1) return -1;
+ }
+ dictReleaseIterator(di);
+ }
+ } else {
+ redisPanic("Unknown object type");
+ }
+ /* Save the expire time */
+ if (expiretime != -1) {
+ char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
+ /* If this key is already expired skip it */
+ if (expiretime < now) continue;
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,&key) == 0) goto werr;
+ if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr;
+ }
+ if (swapped) decrRefCount(o);
+ }
+ dictReleaseIterator(di);
+ }
+
+ /* Make sure data will not remain on the OS's output buffers */
+ fflush(fp);
+ aof_fsync(fileno(fp));
+ fclose(fp);
+
+ /* Use RENAME to make sure the DB file is changed atomically only
+ * if the generate DB file is ok. */
+ if (rename(tmpfile,filename) == -1) {
+ redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
+ unlink(tmpfile);
+ return REDIS_ERR;
+ }
+ redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
+ return REDIS_OK;
+
+werr:
+ fclose(fp);
+ unlink(tmpfile);
+ redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
+ if (di) dictReleaseIterator(di);
+ return REDIS_ERR;
+}
+
+/* This is how rewriting of the append only file in background works:
+ *
+ * 1) The user calls BGREWRITEAOF
+ * 2) Redis calls this function, that forks():
+ * 2a) the child rewrite the append only file in a temp file.
+ * 2b) the parent accumulates differences in server.bgrewritebuf.
+ * 3) When the child finished '2a' exists.
+ * 4) The parent will trap the exit code, if it's OK, will append the
+ * data accumulated into server.bgrewritebuf into the temp file, and
+ * finally will rename(2) the temp file in the actual file name.
+ * The the new file is reopened as the new append only file. Profit!
+ */
+int rewriteAppendOnlyFileBackground(void) {
+ pid_t childpid;
+
+ if (server.bgrewritechildpid != -1) return REDIS_ERR;
+ if (server.vm_enabled) waitEmptyIOJobsQueue();
+ if ((childpid = fork()) == 0) {
+ /* Child */
+ char tmpfile[256];
+
+ if (server.vm_enabled) vmReopenSwapFile();
+ close(server.fd);
+ snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
+ if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
+ _exit(0);
+ } else {
+ _exit(1);
+ }
+ } else {
+ /* Parent */
+ if (childpid == -1) {
+ redisLog(REDIS_WARNING,
+ "Can't rewrite append only file in background: fork: %s",
+ strerror(errno));
+ return REDIS_ERR;
+ }
+ redisLog(REDIS_NOTICE,
+ "Background append only file rewriting started by pid %d",childpid);
+ server.bgrewritechildpid = childpid;
+ updateDictResizePolicy();
+ /* We set appendseldb to -1 in order to force the next call to the
+ * feedAppendOnlyFile() to issue a SELECT command, so the differences
+ * accumulated by the parent into server.bgrewritebuf will start
+ * with a SELECT statement and it will be safe to merge. */
+ server.appendseldb = -1;
+ return REDIS_OK;
+ }
+ return REDIS_OK; /* unreached */
+}
+
+void bgrewriteaofCommand(redisClient *c) {
+ if (server.bgrewritechildpid != -1) {
+ addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
+ return;
+ }
+ if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
+ char *status = "+Background append only file rewriting started\r\n";
+ addReplySds(c,sdsnew(status));
+ } else {
+ addReply(c,shared.err);
+ }
+}
+
+void aofRemoveTempFile(pid_t childpid) {
+ char tmpfile[256];
+
+ snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
+ unlink(tmpfile);
+}
+
+/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
+ * Handle this. */
+void backgroundRewriteDoneHandler(int statloc) {
+ int exitcode = WEXITSTATUS(statloc);
+ int bysignal = WIFSIGNALED(statloc);
+
+ if (!bysignal && exitcode == 0) {
+ int fd;
+ char tmpfile[256];
+
+ redisLog(REDIS_NOTICE,
+ "Background append only file rewriting terminated with success");
+ /* Now it's time to flush the differences accumulated by the parent */
+ snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
+ fd = open(tmpfile,O_WRONLY|O_APPEND);
+ if (fd == -1) {
+ redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
+ goto cleanup;
+ }
+ /* Flush our data... */
+ if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
+ (signed) sdslen(server.bgrewritebuf)) {
+ redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
+ close(fd);
+ goto cleanup;
+ }
+ redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
+ /* Now our work is to rename the temp file into the stable file. And
+ * switch the file descriptor used by the server for append only. */
+ if (rename(tmpfile,server.appendfilename) == -1) {
+ redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
+ close(fd);
+ goto cleanup;
+ }
+ /* Mission completed... almost */
+ redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
+ if (server.appendfd != -1) {
+ /* If append only is actually enabled... */
+ close(server.appendfd);
+ server.appendfd = fd;
+ if (server.appendfsync != APPENDFSYNC_NO) aof_fsync(fd);
+ server.appendseldb = -1; /* Make sure it will issue SELECT */
+ redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
+ } else {
+ /* If append only is disabled we just generate a dump in this
+ * format. Why not? */
+ close(fd);
+ }
+ } else if (!bysignal && exitcode != 0) {
+ redisLog(REDIS_WARNING, "Background append only file rewriting error");
+ } else {
+ redisLog(REDIS_WARNING,
+ "Background append only file rewriting terminated by signal %d",
+ WTERMSIG(statloc));
+ }
+cleanup:
+ sdsfree(server.bgrewritebuf);
+ server.bgrewritebuf = sdsempty();
+ aofRemoveTempFile(server.bgrewritechildpid);
+ server.bgrewritechildpid = -1;
+}
View
438 src/config.c
@@ -0,0 +1,438 @@
+#include "redis.h"
+
+/*-----------------------------------------------------------------------------
+ * Config file parsing
+ *----------------------------------------------------------------------------*/
+
+int yesnotoi(char *s) {
+ if (!strcasecmp(s,"yes")) return 1;
+ else if (!strcasecmp(s,"no")) return 0;
+ else return -1;
+}
+
+void appendServerSaveParams(time_t seconds, int changes) {
+ server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
+ server.saveparams[server.saveparamslen].seconds = seconds;
+ server.saveparams[server.saveparamslen].changes = changes;
+ server.saveparamslen++;
+}
+
+void resetServerSaveParams() {
+ zfree(server.saveparams);
+ server.saveparams = NULL;
+ server.saveparamslen = 0;
+}
+
+/* I agree, this is a very rudimental way to load a configuration...
+ will improve later if the config gets more complex */
+void loadServerConfig(char *filename) {
+ FILE *fp;
+ char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
+ int linenum = 0;
+ sds line = NULL;
+
+ if (filename[0] == '-' && filename[1] == '\0')
+ fp = stdin;
+ else {
+ if ((fp = fopen(filename,"r")) == NULL) {
+ redisLog(REDIS_WARNING, "Fatal error, can't open config file '%s'", filename);
+ exit(1);
+ }
+ }
+
+ while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
+ sds *argv;
+ int argc, j;
+
+ linenum++;
+ line = sdsnew(buf);
+ line = sdstrim(line," \t\r\n");
+
+ /* Skip comments and blank lines*/
+ if (line[0] == '#' || line[0] == '\0') {
+ sdsfree(line);
+ continue;
+ }
+
+ /* Split into arguments */
+ argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
+ sdstolower(argv[0]);
+
+ /* Execute config directives */
+ if (!strcasecmp(argv[0],"timeout") && argc == 2) {
+ server.maxidletime = atoi(argv[1]);
+ if (server.maxidletime < 0) {
+ err = "Invalid timeout value"; goto loaderr;
+ }
+ } else if (!strcasecmp(argv[0],"port") && argc == 2) {
+ server.port = atoi(argv[1]);
+ if (server.port < 1 || server.port > 65535) {
+ err = "Invalid port"; goto loaderr;
+ }
+ } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
+ server.bindaddr = zstrdup(argv[1]);
+ } else if (!strcasecmp(argv[0],"save") && argc == 3) {
+ int seconds = atoi(argv[1]);
+ int changes = atoi(argv[2]);
+ if (seconds < 1 || changes < 0) {
+ err = "Invalid save parameters"; goto loaderr;
+ }
+ appendServerSaveParams(seconds,changes);
+ } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
+ if (chdir(argv[1]) == -1) {
+ redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
+ argv[1], strerror(errno));
+ exit(1);
+ }
+ } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
+ if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
+ else if (!strcasecmp(argv[1],"verbose")) server.verbosity = REDIS_VERBOSE;
+ else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
+ else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
+ else {
+ err = "Invalid log level. Must be one of debug, notice, warning";
+ goto loaderr;
+ }
+ } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
+ FILE *logfp;
+
+ server.logfile = zstrdup(argv[1]);
+ if (!strcasecmp(server.logfile,"stdout")) {
+ zfree(server.logfile);
+ server.logfile = NULL;
+ }
+ if (server.logfile) {
+ /* Test if we are able to open the file. The server will not
+ * be able to abort just for this problem later... */
+ logfp = fopen(server.logfile,"a");
+ if (logfp == NULL) {
+ err = sdscatprintf(sdsempty(),
+ "Can't open the log file: %s", strerror(errno));
+ goto loaderr;
+ }
+ fclose(logfp);
+ }
+ } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
+ server.dbnum = atoi(argv[1]);
+ if (server.dbnum < 1) {
+ err = "Invalid number of databases"; goto loaderr;
+ }
+ } else if (!strcasecmp(argv[0],"include") && argc == 2) {
+ loadServerConfig(argv[1]);
+ } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
+ server.maxclients = atoi(argv[1]);
+ } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
+ server.maxmemory = memtoll(argv[1],NULL);
+ } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
+ server.masterhost = sdsnew(argv[1]);
+ server.masterport = atoi(argv[2]);
+ server.replstate = REDIS_REPL_CONNECT;
+ } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
+ server.masterauth = zstrdup(argv[1]);
+ } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
+ if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
+ } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
+ if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
+ } else if (!strcasecmp(argv[0],"activerehashing") && argc == 2) {
+ if ((server.activerehashing = yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
+ } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
+ if ((server.daemonize = yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
+ } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
+ if ((server.appendonly = yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
+ } else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) {
+ zfree(server.appendfilename);
+ server.appendfilename = zstrdup(argv[1]);
+ } else if (!strcasecmp(argv[0],"no-appendfsync-on-rewrite")
+ && argc == 2) {
+ if ((server.no_appendfsync_on_rewrite= yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
+ } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
+ if (!strcasecmp(argv[1],"no")) {
+ server.appendfsync = APPENDFSYNC_NO;
+ } else if (!strcasecmp(argv[1],"always")) {
+ server.appendfsync = APPENDFSYNC_ALWAYS;
+ } else if (!strcasecmp(argv[1],"everysec")) {
+ server.appendfsync = APPENDFSYNC_EVERYSEC;
+ } else {
+ err = "argument must be 'no', 'always' or 'everysec'";
+ goto loaderr;
+ }
+ } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
+ server.requirepass = zstrdup(argv[1]);
+ } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
+ zfree(server.pidfile);
+ server.pidfile = zstrdup(argv[1]);
+ } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
+ zfree(server.dbfilename);
+ server.dbfilename = zstrdup(argv[1]);
+ } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
+ if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
+ } else if (!strcasecmp(argv[0],"vm-swap-file") && argc == 2) {
+ zfree(server.vm_swap_file);
+ server.vm_swap_file = zstrdup(argv[1]);
+ } else if (!strcasecmp(argv[0],"vm-max-memory") && argc == 2) {
+ server.vm_max_memory = memtoll(argv[1],NULL);
+ } else if (!strcasecmp(argv[0],"vm-page-size") && argc == 2) {
+ server.vm_page_size = memtoll(argv[1], NULL);
+ } else if (!strcasecmp(argv[0],"vm-pages") && argc == 2) {
+ server.vm_pages = memtoll(argv[1], NULL);
+ } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) {
+ server.vm_max_threads = strtoll(argv[1], NULL, 10);
+ } else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2){
+ server.hash_max_zipmap_entries = memtoll(argv[1], NULL);
+ } else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2){
+ server.hash_max_zipmap_value = memtoll(argv[1], NULL);
+ } else if (!strcasecmp(argv[0],"list-max-ziplist-entries") && argc == 2){
+ server.list_max_ziplist_entries = memtoll(argv[1], NULL);
+ } else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2){
+ server.list_max_ziplist_value = memtoll(argv[1], NULL);
+ } else {
+ err = "Bad directive or wrong number of arguments"; goto loaderr;
+ }
+ for (j = 0; j < argc; j++)
+ sdsfree(argv[j]);
+ zfree(argv);
+ sdsfree(line);
+ }
+ if (fp != stdin) fclose(fp);
+ return;
+
+loaderr:
+ fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
+ fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
+ fprintf(stderr, ">>> '%s'\n", line);
+ fprintf(stderr, "%s\n", err);
+ exit(1);
+}
+
+/*-----------------------------------------------------------------------------
+ * CONFIG command for remote configuration
+ *----------------------------------------------------------------------------*/
+
+void configSetCommand(redisClient *c) {
+ robj *o = getDecodedObject(c->argv[3]);
+ long long ll;
+
+ if (!strcasecmp(c->argv[2]->ptr,"dbfilename")) {
+ zfree(server.dbfilename);
+ server.dbfilename = zstrdup(o->ptr);
+ } else if (!strcasecmp(c->argv[2]->ptr,"requirepass")) {
+ zfree(server.requirepass);
+ server.requirepass = zstrdup(o->ptr);
+ } else if (!strcasecmp(c->argv[2]->ptr,"masterauth")) {
+ zfree(server.masterauth);
+ server.masterauth = zstrdup(o->ptr);
+ } else if (!strcasecmp(c->argv[2]->ptr,"maxmemory")) {
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
+ ll < 0) goto badfmt;
+ server.maxmemory = ll;
+ } else if (!strcasecmp(c->argv[2]->ptr,"timeout")) {
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
+ ll < 0 || ll > LONG_MAX) goto badfmt;
+ server.maxidletime = ll;
+ } else if (!strcasecmp(c->argv[2]->ptr,"appendfsync")) {
+ if (!strcasecmp(o->ptr,"no")) {
+ server.appendfsync = APPENDFSYNC_NO;
+ } else if (!strcasecmp(o->ptr,"everysec")) {
+ server.appendfsync = APPENDFSYNC_EVERYSEC;
+ } else if (!strcasecmp(o->ptr,"always")) {
+ server.appendfsync = APPENDFSYNC_ALWAYS;
+ } else {
+ goto badfmt;
+ }
+ } else if (!strcasecmp(c->argv[2]->ptr,"no-appendfsync-on-rewrite")) {
+ int yn = yesnotoi(o->ptr);
+
+ if (yn == -1) goto badfmt;
+ server.no_appendfsync_on_rewrite = yn;
+ } else if (!strcasecmp(c->argv[2]->ptr,"appendonly")) {
+ int old = server.appendonly;
+ int new = yesnotoi(o->ptr);
+
+ if (new == -1) goto badfmt;
+ if (old != new) {
+ if (new == 0) {
+ stopAppendOnly();
+ } else {
+ if (startAppendOnly() == REDIS_ERR) {
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR Unable to turn on AOF. Check server logs.\r\n"));
+ decrRefCount(o);
+ return;
+ }
+ }
+ }
+ } else if (!strcasecmp(c->argv[2]->ptr,"save")) {
+ int vlen, j;
+ sds *v = sdssplitlen(o->ptr,sdslen(o->ptr)," ",1,&vlen);
+
+ /* Perform sanity check before setting the new config:
+ * - Even number of args
+ * - Seconds >= 1, changes >= 0 */
+ if (vlen & 1) {
+ sdsfreesplitres(v,vlen);
+ goto badfmt;
+ }
+ for (j = 0; j < vlen; j++) {
+ char *eptr;
+ long val;
+
+ val = strtoll(v[j], &eptr, 10);
+ if (eptr[0] != '\0' ||
+ ((j & 1) == 0 && val < 1) ||
+ ((j & 1) == 1 && val < 0)) {
+ sdsfreesplitres(v,vlen);
+ goto badfmt;
+ }
+ }
+ /* Finally set the new config */
+ resetServerSaveParams();
+ for (j = 0; j < vlen; j += 2) {
+ time_t seconds;
+ int changes;
+
+ seconds = strtoll(v[j],NULL,10);
+ changes = strtoll(v[j+1],NULL,10);
+ appendServerSaveParams(seconds, changes);
+ }
+ sdsfreesplitres(v,vlen);
+ } else {
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR not supported CONFIG parameter %s\r\n",
+ (char*)c->argv[2]->ptr));
+ decrRefCount(o);
+ return;
+ }
+ decrRefCount(o);
+ addReply(c,shared.ok);
+ return;
+
+badfmt: /* Bad format errors */
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR invalid argument '%s' for CONFIG SET '%s'\r\n",
+ (char*)o->ptr,
+ (char*)c->argv[2]->ptr));
+ decrRefCount(o);
+}
+
+void configGetCommand(redisClient *c) {
+ robj *o = getDecodedObject(c->argv[2]);
+ robj *lenobj = createObject(REDIS_STRING,NULL);
+ char *pattern = o->ptr;
+ int matches = 0;
+
+ addReply(c,lenobj);
+ decrRefCount(lenobj);
+
+ if (stringmatch(pattern,"dbfilename",0)) {
+ addReplyBulkCString(c,"dbfilename");
+ addReplyBulkCString(c,server.dbfilename);
+ matches++;
+ }
+ if (stringmatch(pattern,"requirepass",0)) {
+ addReplyBulkCString(c,"requirepass");
+ addReplyBulkCString(c,server.requirepass);
+ matches++;
+ }
+ if (stringmatch(pattern,"masterauth",0)) {
+ addReplyBulkCString(c,"masterauth");
+ addReplyBulkCString(c,server.masterauth);
+ matches++;
+ }
+ if (stringmatch(pattern,"maxmemory",0)) {
+ char buf[128];
+
+ ll2string(buf,128,server.maxmemory);
+ addReplyBulkCString(c,"maxmemory");
+ addReplyBulkCString(c,buf);
+ matches++;
+ }
+ if (stringmatch(pattern,"timeout",0)) {
+ char buf[128];
+
+ ll2string(buf,128,server.maxidletime);
+ addReplyBulkCString(c,"timeout");
+ addReplyBulkCString(c,buf);
+ matches++;
+ }
+ if (stringmatch(pattern,"appendonly",0)) {
+ addReplyBulkCString(c,"appendonly");
+ addReplyBulkCString(c,server.appendonly ? "yes" : "no");
+ matches++;
+ }
+ if (stringmatch(pattern,"no-appendfsync-on-rewrite",0)) {
+ addReplyBulkCString(c,"no-appendfsync-on-rewrite");
+ addReplyBulkCString(c,server.no_appendfsync_on_rewrite ? "yes" : "no");
+ matches++;
+ }
+ if (stringmatch(pattern,"appendfsync",0)) {
+ char *policy;
+
+ switch(server.appendfsync) {
+ case APPENDFSYNC_NO: policy = "no"; break;
+ case APPENDFSYNC_EVERYSEC: policy = "everysec"; break;
+ case APPENDFSYNC_ALWAYS: policy = "always"; break;
+ default: policy = "unknown"; break; /* too harmless to panic */
+ }
+ addReplyBulkCString(c,"appendfsync");
+ addReplyBulkCString(c,policy);
+ matches++;
+ }
+ if (stringmatch(pattern,"save",0)) {
+ sds buf = sdsempty();
+ int j;
+
+ for (j = 0; j < server.saveparamslen; j++) {
+ buf = sdscatprintf(buf,"%ld %d",
+ server.saveparams[j].seconds,
+ server.saveparams[j].changes);
+ if (j != server.saveparamslen-1)
+ buf = sdscatlen(buf," ",1);
+ }
+ addReplyBulkCString(c,"save");
+ addReplyBulkCString(c,buf);
+ sdsfree(buf);
+ matches++;
+ }
+ decrRefCount(o);
+ lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2);
+}
+
+void configCommand(redisClient *c) {
+ if (!strcasecmp(c->argv[1]->ptr,"set")) {
+ if (c->argc != 4) goto badarity;
+ configSetCommand(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"get")) {
+ if (c->argc != 3) goto badarity;
+ configGetCommand(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"resetstat")) {
+ if (c->argc != 2) goto badarity;
+ server.stat_numcommands = 0;
+ server.stat_numconnections = 0;
+ server.stat_expiredkeys = 0;
+ server.stat_starttime = time(NULL);
+ addReply(c,shared.ok);
+ } else {
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR CONFIG subcommand must be one of GET, SET, RESETSTAT\r\n"));
+ }
+ return;
+
+badarity:
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR Wrong number of arguments for CONFIG %s\r\n",
+ (char*) c->argv[1]->ptr));
+}
View
0  config.h → src/config.h
File renamed without changes
View
508 src/db.c
@@ -0,0 +1,508 @@
+#include "redis.h"
+
+#include <signal.h>
+
+/*-----------------------------------------------------------------------------
+ * C-level DB API
+ *----------------------------------------------------------------------------*/
+
+robj *lookupKey(redisDb *db, robj *key) {
+ dictEntry *de = dictFind(db->dict,key->ptr);
+ if (de) {
+ robj *val = dictGetEntryVal(de);
+
+ if (server.vm_enabled) {
+ if (val->storage == REDIS_VM_MEMORY ||
+ val->storage == REDIS_VM_SWAPPING)
+ {
+ /* If we were swapping the object out, cancel the operation */
+ if (val->storage == REDIS_VM_SWAPPING)
+ vmCancelThreadedIOJob(val);
+ /* Update the access time for the aging algorithm. */
+ val->lru = server.lruclock;
+ } else {
+ int notify = (val->storage == REDIS_VM_LOADING);
+
+ /* Our value was swapped on disk. Bring it at home. */
+ redisAssert(val->type == REDIS_VMPOINTER);
+ val = vmLoadObject(val);
+ dictGetEntryVal(de) = val;
+
+ /* Clients blocked by the VM subsystem may be waiting for
+ * this key... */
+ if (notify) handleClientsBlockedOnSwappedKey(db,key);
+ }
+ }
+ return val;
+ } else {
+ return NULL;
+ }
+}
+
+robj *lookupKeyRead(redisDb *db, robj *key) {
+ expireIfNeeded(db,key);
+ return lookupKey(db,key);
+}
+
+robj *lookupKeyWrite(redisDb *db, robj *key) {
+ deleteIfVolatile(db,key);
+ touchWatchedKey(db,key);
+ return lookupKey(db,key);
+}
+
+robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
+ robj *o = lookupKeyRead(c->db, key);
+ if (!o) addReply(c,reply);
+ return o;
+}
+
+robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
+ robj *o = lookupKeyWrite(c->db, key);
+ if (!o) addReply(c,reply);
+ return o;
+}
+
+/* Add the key to the DB. If the key already exists REDIS_ERR is returned,
+ * otherwise REDIS_OK is returned, and the caller should increment the
+ * refcount of 'val'. */
+int dbAdd(redisDb *db, robj *key, robj *val) {
+ /* Perform a lookup before adding the key, as we need to copy the
+ * key value. */
+ if (dictFind(db->dict, key->ptr) != NULL) {
+ return REDIS_ERR;
+ } else {
+ sds copy = sdsdup(key->ptr);
+ dictAdd(db->dict, copy, val);
+ return REDIS_OK;
+ }
+}
+
+/* If the key does not exist, this is just like dbAdd(). Otherwise
+ * the value associated to the key is replaced with the new one.
+ *
+ * On update (key already existed) 0 is returned. Otherwise 1. */
+int dbReplace(redisDb *db, robj *key, robj *val) {
+ if (dictFind(db->dict,key->ptr) == NULL) {
+ sds copy = sdsdup(key->ptr);
+ dictAdd(db->dict, copy, val);
+ return 1;
+ } else {
+ dictReplace(db->dict, key->ptr, val);
+ return 0;
+ }
+}
+
+int dbExists(redisDb *db, robj *key) {
+ return dictFind(db->dict,key->ptr) != NULL;
+}
+
+/* Return a random key, in form of a Redis object.
+ * If there are no keys, NULL is returned.
+ *
+ * The function makes sure to return keys not already expired. */
+robj *dbRandomKey(redisDb *db) {
+ struct dictEntry *de;
+
+ while(1) {
+ sds key;
+ robj *keyobj;
+
+ de = dictGetRandomKey(db->dict);
+ if (de == NULL) return NULL;
+
+ key = dictGetEntryKey(de);
+ keyobj = createStringObject(key,sdslen(key));
+ if (dictFind(db->expires,key)) {
+ if (expireIfNeeded(db,keyobj)) {
+ decrRefCount(keyobj);
+ continue; /* search for another key. This expired. */
+ }
+ }
+ return keyobj;
+ }
+}
+
+/* Delete a key, value, and associated expiration entry if any, from the DB */
+int dbDelete(redisDb *db, robj *key) {
+ /* Deleting an entry from the expires dict will not free the sds of
+ * the key, because it is shared with the main dictionary. */
+ if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
+ return dictDelete(db->dict,key->ptr) == DICT_OK;
+}
+
+/* Empty the whole database */
+long long emptyDb() {
+ int j;
+ long long removed = 0;
+
+ for (j = 0; j < server.dbnum; j++) {
+ removed += dictSize(server.db[j].dict);
+ dictEmpty(server.db[j].dict);
+ dictEmpty(server.db[j].expires);
+ }
+ return removed;
+}
+
+int selectDb(redisClient *c, int id) {
+ if (id < 0 || id >= server.dbnum)
+ return REDIS_ERR;
+ c->db = &server.db[id];
+ return REDIS_OK;
+}
+
+/*-----------------------------------------------------------------------------
+ * Type agnostic commands operating on the key space
+ *----------------------------------------------------------------------------*/
+
+void flushdbCommand(redisClient *c) {
+ server.dirty += dictSize(c->db->dict);
+ touchWatchedKeysOnFlush(c->db->id);
+ dictEmpty(c->db->dict);
+ dictEmpty(c->db->expires);
+ addReply(c,shared.ok);
+}
+
+void flushallCommand(redisClient *c) {
+ touchWatchedKeysOnFlush(-1);
+ server.dirty += emptyDb();
+ addReply(c,shared.ok);
+ if (server.bgsavechildpid != -1) {
+ kill(server.bgsavechildpid,SIGKILL);
+ rdbRemoveTempFile(server.bgsavechildpid);
+ }
+ rdbSave(server.dbfilename);
+ server.dirty++;
+}
+
+void delCommand(redisClient *c) {
+ int deleted = 0, j;
+
+ for (j = 1; j < c->argc; j++) {
+ if (dbDelete(c->db,c->argv[j])) {
+ touchWatchedKey(c->db,c->argv[j]);
+ server.dirty++;
+ deleted++;
+ }
+ }
+ addReplyLongLong(c,deleted);
+}
+
+void existsCommand(redisClient *c) {
+ expireIfNeeded(c->db,c->argv[1]);
+ if (dbExists(c->db,c->argv[1])) {
+ addReply(c, shared.cone);
+ } else {
+ addReply(c, shared.czero);
+ }
+}
+
+void selectCommand(redisClient *c) {
+ int id = atoi(c->argv[1]->ptr);
+
+ if (selectDb(c,id) == REDIS_ERR) {
+ addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
+ } else {
+ addReply(c,shared.ok);
+ }
+}
+
+void randomkeyCommand(redisClient *c) {
+ robj *key;
+
+ if ((key = dbRandomKey(c->db)) == NULL) {
+ addReply(c,shared.nullbulk);
+ return;
+ }
+
+ addReplyBulk(c,key);
+ decrRefCount(key);
+}
+
+void keysCommand(redisClient *c) {
+ dictIterator *di;
+ dictEntry *de;
+ sds pattern = c->argv[1]->ptr;
+ int plen = sdslen(pattern);
+ unsigned long numkeys = 0;
+ robj *lenobj = createObject(REDIS_STRING,NULL);
+
+ di = dictGetIterator(c->db->dict);
+ addReply(c,lenobj);
+ decrRefCount(lenobj);
+ while((de = dictNext(di)) != NULL) {
+ sds key = dictGetEntryKey(de);
+ robj *keyobj;
+
+ if ((pattern[0] == '*' && pattern[1] == '\0') ||
+ stringmatchlen(pattern,plen,key,sdslen(key),0)) {
+ keyobj = createStringObject(key,sdslen(key));
+ if (expireIfNeeded(c->db,keyobj) == 0) {
+ addReplyBulk(c,keyobj);
+ numkeys++;
+ }
+ decrRefCount(keyobj);
+ }
+ }
+ dictReleaseIterator(di);
+ lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",numkeys);
+}
+
+void dbsizeCommand(redisClient *c) {
+ addReplySds(c,
+ sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
+}
+
+void lastsaveCommand(redisClient *c) {
+ addReplySds(c,
+ sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
+}
+
+void typeCommand(redisClient *c) {
+ robj *o;
+ char *type;
+
+ o = lookupKeyRead(c->db,c->argv[1]);
+ if (o == NULL) {
+ type = "+none";
+ } else {
+ switch(o->type) {
+ case REDIS_STRING: type = "+string"; break;
+ case REDIS_LIST: type = "+list"; break;
+ case REDIS_SET: type = "+set"; break;
+ case REDIS_ZSET: type = "+zset"; break;
+ case REDIS_HASH: type = "+hash"; break;
+ default: type = "+unknown"; break;
+ }
+ }
+ addReplySds(c,sdsnew(type));
+ addReply(c,shared.crlf);
+}
+
+void saveCommand(redisClient *c) {
+ if (server.bgsavechildpid != -1) {
+ addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
+ return;
+ }
+ if (rdbSave(server.dbfilename) == REDIS_OK) {
+ addReply(c,shared.ok);
+ } else {
+ addReply(c,shared.err);
+ }
+}
+
+void bgsaveCommand(redisClient *c) {
+ if (server.bgsavechildpid != -1) {
+ addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
+ return;
+ }
+ if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
+ char *status = "+Background saving started\r\n";
+ addReplySds(c,sdsnew(status));
+ } else {
+ addReply(c,shared.err);
+ }
+}
+
+void shutdownCommand(redisClient *c) {
+ if (prepareForShutdown() == REDIS_OK)
+ exit(0);
+ addReplySds(c, sdsnew("-ERR Errors trying to SHUTDOWN. Check logs.\r\n"));
+}
+
+void renameGenericCommand(redisClient *c, int nx) {
+ robj *o;
+
+ /* To use the same key as src and dst is probably an error */
+ if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
+ addReply(c,shared.sameobjecterr);
+ return;
+ }
+
+ if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
+ return;
+
+ incrRefCount(o);
+ deleteIfVolatile(c->db,c->argv[2]);
+ if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
+ if (nx) {
+ decrRefCount(o);
+ addReply(c,shared.czero);
+ return;
+ }
+ dbReplace(c->db,c->argv[2],o);
+ }
+ dbDelete(c->db,c->argv[1]);
+ touchWatchedKey(c->db,c->argv[2]);
+ server.dirty++;
+ addReply(c,nx ? shared.cone : shared.ok);
+}
+
+void renameCommand(redisClient *c) {
+ renameGenericCommand(c,0);
+}
+
+void renamenxCommand(redisClient *c) {
+ renameGenericCommand(c,1);
+}
+
+void moveCommand(redisClient *c) {
+ robj *o;
+ redisDb *src, *dst;
+ int srcid;
+
+ /* Obtain source and target DB pointers */
+ src = c->db;
+ srcid = c->db->id;
+ if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
+ addReply(c,shared.outofrangeerr);
+ return;
+ }
+ dst = c->db;
+ selectDb(c,srcid); /* Back to the source DB */
+
+ /* If the user is moving using as target the same
+ * DB as the source DB it is probably an error. */
+ if (src == dst) {
+ addReply(c,shared.sameobjecterr);
+ return;
+ }
+
+ /* Check if the element exists and get a reference */
+ o = lookupKeyWrite(c->db,c->argv[1]);
+ if (!o) {
+ addReply(c,shared.czero);
+ return;
+ }
+
+ /* Try to add the element to the target DB */
+ deleteIfVolatile(dst,c->argv[1]);
+ if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
+ addReply(c,shared.czero);
+ return;
+ }
+ incrRefCount(o);
+
+ /* OK! key moved, free the entry in the source DB */
+ dbDelete(src,c->argv[1]);
+ server.dirty++;
+ addReply(c,shared.cone);
+}
+
+/*-----------------------------------------------------------------------------
+ * Expires API
+ *----------------------------------------------------------------------------*/
+
+int removeExpire(redisDb *db, robj *key) {
+ /* An expire may only be removed if there is a corresponding entry in the
+ * main dict. Otherwise, the key will never be freed. */
+ redisAssert(dictFind(db->dict,key->ptr) != NULL);
+ if (dictDelete(db->expires,key->ptr) == DICT_OK) {
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+int setExpire(redisDb *db, robj *key, time_t when) {
+ dictEntry *de;
+
+ /* Reuse the sds from the main dict in the expire dict */
+ redisAssert((de = dictFind(db->dict,key->ptr)) != NULL);
+ if (dictAdd(db->expires,dictGetEntryKey(de),(void*)when) == DICT_ERR) {
+ return 0;
+ } else {
+ return 1;
+ }
+}
+
+/* Return the expire time of the specified key, or -1 if no expire
+ * is associated with this key (i.e. the key is non volatile) */
+time_t getExpire(redisDb *db, robj *key) {
+ dictEntry *de;
+
+ /* No expire? return ASAP */
+ if (dictSize(db->expires) == 0 ||
+ (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
+
+ /* The entry was found in the expire dict, this means it should also
+ * be present in the main dict (safety check). */
+ redisAssert(dictFind(db->dict,key->ptr) != NULL);
+ return (time_t) dictGetEntryVal(de);
+}
+
+int expireIfNeeded(redisDb *db, robj *key) {
+ time_t when = getExpire(db,key);
+ if (when < 0) return 0;
+
+ /* Return when this key has not expired */
+ if (time(NULL) <= when) return 0;
+
+ /* Delete the key */
+ server.stat_expiredkeys++;
+ server.dirty++;
+ return dbDelete(db,key);
+}
+
+int deleteIfVolatile(redisDb *db, robj *key) {
+ if (getExpire(db,key) < 0) return 0;
+
+ /* Delete the key */
+ server.stat_expiredkeys++;
+ server.dirty++;
+ return dbDelete(db,key);
+}
+
+/*-----------------------------------------------------------------------------
+ * Expires Commands
+ *----------------------------------------------------------------------------*/
+
+void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
+ dictEntry *de;
+ time_t seconds;
+
+ if (getLongFromObjectOrReply(c, param, &seconds, NULL) != REDIS_OK) return;
+
+ seconds -= offset;
+
+ de = dictFind(c->db->dict,key->ptr);
+ if (de == NULL) {
+ addReply(c,shared.czero);
+ return;
+ }
+ if (seconds <= 0) {
+ if (dbDelete(c->db,key)) server.dirty++;
+ addReply(c, shared.cone);
+ return;
+ } else {
+ time_t when = time(NULL)+seconds;
+ if (setExpire(c->db,key,when)) {
+ addReply(c,shared.cone);
+ server.dirty++;
+ } else {
+ addReply(c,shared.czero);
+ }
+ return;
+ }
+}
+
+void expireCommand(redisClient *c) {
+ expireGenericCommand(c,c->argv[1],c->argv[2],0);
+}
+
+void expireatCommand(redisClient *c) {
+ expireGenericCommand(c,c->argv[1],c->argv[2],time(NULL));
+}
+
+void ttlCommand(redisClient *c) {
+ time_t expire;
+ int ttl = -1;
+
+ expire = getExpire(c->db,c->argv[1]);
+ if (expire != -1) {
+ ttl = (int) (expire-time(NULL));
+ if (ttl < 0) ttl = -1;
+ }
+ addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
+}
+
+
View
309 src/debug.c
@@ -0,0 +1,309 @@
+#include "redis.h"
+#include "sha1.h" /* SHA1 is used for DEBUG DIGEST */
+
+/* ================================= Debugging ============================== */
+
+/* Compute the sha1 of string at 's' with 'len' bytes long.
+ * The SHA1 is then xored againt the string pointed by digest.
+ * Since xor is commutative, this operation is used in order to
+ * "add" digests relative to unordered elements.
+ *
+ * So digest(a,b,c,d) will be the same of digest(b,a,c,d) */
+void xorDigest(unsigned char *digest, void *ptr, size_t len) {
+ SHA1_CTX ctx;
+ unsigned char hash[20], *s = ptr;
+ int j;
+
+ SHA1Init(&ctx);
+ SHA1Update(&ctx,s,len);
+ SHA1Final(hash,&ctx);
+
+ for (j = 0; j < 20; j++)
+ digest[j] ^= hash[j];
+}
+
+void xorObjectDigest(unsigned char *digest, robj *o) {
+ o = getDecodedObject(o);
+ xorDigest(digest,o->ptr,sdslen(o->ptr));
+ decrRefCount(o);
+}
+
+/* This function instead of just computing the SHA1 and xoring it
+ * against diget, also perform the digest of "digest" itself and
+ * replace the old value with the new one.
+ *
+ * So the final digest will be:
+ *
+ * digest = SHA1(digest xor SHA1(data))
+ *
+ * This function is used every time we want to preserve the order so
+ * that digest(a,b,c,d) will be different than digest(b,c,d,a)
+ *
+ * Also note that mixdigest("foo") followed by mixdigest("bar")
+ * will lead to a different digest compared to "fo", "obar".
+ */
+void mixDigest(unsigned char *digest, void *ptr, size_t len) {
+ SHA1_CTX ctx;
+ char *s = ptr;
+
+ xorDigest(digest,s,len);
+ SHA1Init(&ctx);
+ SHA1Update(&ctx,digest,20);
+ SHA1Final(digest,&ctx);
+}
+
+void mixObjectDigest(unsigned char *digest, robj *o) {
+ o = getDecodedObject(o);
+ mixDigest(digest,o->ptr,sdslen(o->ptr));
+ decrRefCount(o);
+}
+
+/* Compute the dataset digest. Since keys, sets elements, hashes elements
+ * are not ordered, we use a trick: every aggregate digest is the xor
+ * of the digests of their elements. This way the order will not change
+ * the result. For list instead we use a feedback entering the output digest
+ * as input in order to ensure that a different ordered list will result in
+ * a different digest. */
+void computeDatasetDigest(unsigned char *final) {
+ unsigned char digest[20];
+ char buf[128];
+ dictIterator *di = NULL;
+ dictEntry *de;
+ int j;
+ uint32_t aux;
+
+ memset(final,0,20); /* Start with a clean result */
+
+ for (j = 0; j < server.dbnum; j++) {
+ redisDb *db = server.db+j;
+
+ if (dictSize(db->dict) == 0) continue;
+ di = dictGetIterator(db->dict);
+
+ /* hash the DB id, so the same dataset moved in a different
+ * DB will lead to a different digest */
+ aux = htonl(j);
+ mixDigest(final,&aux,sizeof(aux));
+
+ /* Iterate this DB writing every entry */
+ while((de = dictNext(di)) != NULL) {
+ sds key;
+ robj *keyobj, *o;
+ time_t expiretime;
+
+ memset(digest,0,20); /* This key-val digest */
+ key = dictGetEntryKey(de);
+ keyobj = createStringObject(key,sdslen(key));
+
+ mixDigest(digest,key,sdslen(key));
+
+ /* Make sure the key is loaded if VM is active */
+ o = lookupKeyRead(db,keyobj);
+
+ aux = htonl(o->type);
+ mixDigest(digest,&aux,sizeof(aux));
+ expiretime = getExpire(db,keyobj);
+
+ /* Save the key and associated value */
+ if (o->type == REDIS_STRING) {
+ mixObjectDigest(digest,o);
+ } else if (o->type == REDIS_LIST) {
+ listTypeIterator *li = listTypeInitIterator(o,0,REDIS_TAIL);
+ listTypeEntry entry;
+ while(listTypeNext(li,&entry)) {
+ robj *eleobj = listTypeGet(&entry);
+ mixObjectDigest(digest,eleobj);
+ decrRefCount(eleobj);
+ }
+ listTypeReleaseIterator(li);
+ } else if (o->type == REDIS_SET) {
+ dict *set = o->ptr;
+ dictIterator *di = dictGetIterator(set);
+ dictEntry *de;
+
+ while((de = dictNext(di)) != NULL) {
+ robj *eleobj = dictGetEntryKey(de);
+
+ xorObjectDigest(digest,eleobj);
+ }
+ dictReleaseIterator(di);
+ } else if (o->type == REDIS_ZSET) {
+ zset *zs = o->ptr;
+ dictIterator *di = dictGetIterator(zs->dict);
+ dictEntry *de;
+
+ while((de = dictNext(di)) != NULL) {
+ robj *eleobj = dictGetEntryKey(de);
+ double *score = dictGetEntryVal(de);
+ unsigned char eledigest[20];
+
+ snprintf(buf,sizeof(buf),"%.17g",*score);
+ memset(eledigest,0,20);
+ mixObjectDigest(eledigest,eleobj);
+ mixDigest(eledigest,buf,strlen(buf));
+ xorDigest(digest,eledigest,20);
+ }
+ dictReleaseIterator(di);
+ } else if (o->type == REDIS_HASH) {
+ hashTypeIterator *hi;
+ robj *obj;
+
+ hi = hashTypeInitIterator(o);
+ while (hashTypeNext(hi) != REDIS_ERR) {
+ unsigned char eledigest[20];
+
+ memset(eledigest,0,20);
+ obj = hashTypeCurrent(hi,REDIS_HASH_KEY);
+ mixObjectDigest(eledigest,obj);
+ decrRefCount(obj);
+ obj = hashTypeCurrent(hi,REDIS_HASH_VALUE);
+ mixObjectDigest(eledigest,obj);
+ decrRefCount(obj);
+ xorDigest(digest,eledigest,20);
+ }
+ hashTypeReleaseIterator(hi);
+ } else {
+ redisPanic("Unknown object type");
+ }
+ /* If the key has an expire, add it to the mix */
+ if (expiretime != -1) xorDigest(digest,"!!expire!!",10);
+ /* We can finally xor the key-val digest to the final digest */
+ xorDigest(final,digest,20);
+ decrRefCount(keyobj);
+ }
+ dictReleaseIterator(di);
+ }
+}
+
+void debugCommand(redisClient *c) {
+ if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
+ *((char*)-1) = 'x';
+ } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
+ if (rdbSave(server.dbfilename) != REDIS_OK) {
+ addReply(c,shared.err);
+ return;
+ }
+ emptyDb();
+ if (rdbLoad(server.dbfilename) != REDIS_OK) {
+ addReply(c,shared.err);
+ return;
+ }
+ redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
+ addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
+ emptyDb();
+ if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
+ addReply(c,shared.err);
+ return;
+ }
+ redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
+ addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
+ dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr);
+ robj *val;
+
+ if (!de) {
+ addReply(c,shared.nokeyerr);
+ return;
+ }
+ val = dictGetEntryVal(de);
+ if (!server.vm_enabled || (val->storage == REDIS_VM_MEMORY ||
+ val->storage == REDIS_VM_SWAPPING)) {
+ char *strenc;
+
+ strenc = strEncoding(val->encoding);
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "+Value at:%p refcount:%d "
+ "encoding:%s serializedlength:%lld\r\n",
+ (void*)val, val->refcount,
+ strenc, (long long) rdbSavedObjectLen(val,NULL)));
+ } else {
+ vmpointer *vp = (vmpointer*) val;
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "+Value swapped at: page %llu "
+ "using %llu pages\r\n",
+ (unsigned long long) vp->page,
+ (unsigned long long) vp->usedpages));
+ }
+ } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) {
+ lookupKeyRead(c->db,c->argv[2]);
+ addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) {
+ dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr);
+ robj *val;
+ vmpointer *vp;
+
+ if (!server.vm_enabled) {
+ addReplySds(c,sdsnew("-ERR Virtual Memory is disabled\r\n"));
+ return;
+ }
+ if (!de) {
+ addReply(c,shared.nokeyerr);
+ return;
+ }
+ val = dictGetEntryVal(de);
+ /* Swap it */
+ if (val->storage != REDIS_VM_MEMORY) {
+ addReplySds(c,sdsnew("-ERR This key is not in memory\r\n"));
+ } else if (val->refcount != 1) {
+ addReplySds(c,sdsnew("-ERR Object is shared\r\n"));
+ } else if ((vp = vmSwapObjectBlocking(val)) != NULL) {
+ dictGetEntryVal(de) = vp;
+ addReply(c,shared.ok);
+ } else {
+ addReply(c,shared.err);
+ }
+ } else if (!strcasecmp(c->argv[1]->ptr,"populate") && c->argc == 3) {
+ long keys, j;
+ robj *key, *val;
+ char buf[128];
+
+ if (getLongFromObjectOrReply(c, c->argv[2], &keys, NULL) != REDIS_OK)
+ return;
+ for (j = 0; j < keys; j++) {
+ snprintf(buf,sizeof(buf),"key:%lu",j);
+ key = createStringObject(buf,strlen(buf));
+ if (lookupKeyRead(c->db,key) != NULL) {
+ decrRefCount(key);
+ continue;
+ }
+ snprintf(buf,sizeof(buf),"value:%lu",j);
+ val = createStringObject(buf,strlen(buf));
+ dbAdd(c->db,key,val);
+ decrRefCount(key);
+ }
+ addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) {
+ unsigned char digest[20];
+ sds d = sdsnew("+");
+ int j;
+
+ computeDatasetDigest(digest);
+ for (j = 0; j < 20; j++)
+ d = sdscatprintf(d, "%02x",digest[j]);
+
+ d = sdscatlen(d,"\r\n",2);
+ addReplySds(c,d);
+ } else {
+ addReplySds(c,sdsnew(
+ "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPIN <key>|SWAPOUT <key>|RELOAD]\r\n"));
+ }
+}
+
+void _redisAssert(char *estr, char *file, int line) {
+ redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
+ redisLog(REDIS_WARNING,"==> %s:%d '%s' is not true",file,line,estr);
+#ifdef HAVE_BACKTRACE
+ redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
+ *((char*)-1) = 'x';
+#endif
+}
+
+void _redisPanic(char *msg, char *file, int line) {
+ redisLog(REDIS_WARNING,"!!! Software Failure. Press left mouse button to continue");
+ redisLog(REDIS_WARNING,"Guru Meditation: %s #%s:%d",msg,file,line);
+#ifdef HAVE_BACKTRACE
+ redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
+ *((char*)-1) = 'x';
+#endif
+}
View
0  dict.c → src/dict.c
File renamed without changes
View
0  dict.h → src/dict.h
File renamed without changes
View
0  fmacros.h → src/fmacros.h
File renamed without changes
View
0  linenoise.c → src/linenoise.c
File renamed without changes
View
0  linenoise.h → src/linenoise.h
File renamed without changes
View
0  lzf.h → src/lzf.h
File renamed without changes
View
0  lzfP.h → src/lzfP.h
File renamed without changes
View
0  lzf_c.c → src/lzf_c.c
File renamed without changes
View
0  lzf_d.c → src/lzf_d.c
File renamed without changes
View
0  mkreleasehdr.sh → src/mkreleasehdr.sh
File renamed without changes
View
266 src/multi.c
@@ -0,0 +1,266 @@
+#include "redis.h"
+
+/* ================================ MULTI/EXEC ============================== */
+
+/* Client state initialization for MULTI/EXEC */
+void initClientMultiState(redisClient *c) {
+ c->mstate.commands = NULL;
+ c->mstate.count = 0;
+}
+
+/* Release all the resources associated with MULTI/EXEC state */
+void freeClientMultiState(redisClient *c) {
+ int j;
+
+ for (j = 0; j < c->mstate.count; j++) {
+ int i;
+ multiCmd *mc = c->mstate.commands+j;
+
+ for (i = 0; i < mc->argc; i++)
+ decrRefCount(mc->argv[i]);
+ zfree(mc->argv);
+ }
+ zfree(c->mstate.commands);
+}
+
+/* Add a new command into the MULTI commands queue */
+void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
+ multiCmd *mc;
+ int j;
+
+ c->mstate.commands = zrealloc(c->mstate.commands,
+ sizeof(multiCmd)*(c->mstate.count+1));
+ mc = c->mstate.commands+c->mstate.count;
+ mc->cmd = cmd;
+ mc->argc = c->argc;
+ mc->argv = zmalloc(sizeof(robj*)*c->argc);
+ memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
+ for (j = 0; j < c->argc; j++)
+ incrRefCount(mc->argv[j]);
+ c->mstate.count++;
+}
+
+void multiCommand(redisClient *c) {
+ if (c->flags & REDIS_MULTI) {
+ addReplySds(c,sdsnew("-ERR MULTI calls can not be nested\r\n"));
+ return;
+ }
+ c->flags |= REDIS_MULTI;
+ addReply(c,shared.ok);
+}
+
+void discardCommand(redisClient *c) {
+ if (!(c->flags & REDIS_MULTI)) {
+ addReplySds(c,sdsnew("-ERR DISCARD without MULTI\r\n"));
+ return;
+ }
+
+ freeClientMultiState(c);
+ initClientMultiState(c);
+ c->flags &= (~REDIS_MULTI);
+ unwatchAllKeys(c);
+ addReply(c,shared.ok);
+}
+
+/* Send a MULTI command to all the slaves and AOF file. Check the execCommand
+ * implememntation for more information. */
+void execCommandReplicateMulti(redisClient *c) {
+ struct redisCommand *cmd;
+ robj *multistring = createStringObject("MULTI",5);
+
+ cmd = lookupCommand("multi");
+ if (server.appendonly)
+ feedAppendOnlyFile(cmd,c->db->id,&multistring,1);
+ if (listLength(server.slaves))
+ replicationFeedSlaves(server.slaves,c->db->id,&multistring,1);
+ decrRefCount(multistring);
+}
+
+void execCommand(redisClient *c) {
+ int j;
+ robj **orig_argv;
+ int orig_argc;
+
+ if (!(c->flags & REDIS_MULTI)) {
+ addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
+ return;
+ }
+
+ /* Check if we need to abort the EXEC if some WATCHed key was touched.
+ * A failed EXEC will return a multi bulk nil object. */
+ if (c->flags & REDIS_DIRTY_CAS) {
+ freeClientMultiState(c);
+ initClientMultiState(c);
+ c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
+ unwatchAllKeys(c);
+ addReply(c,shared.nullmultibulk);
+ return;
+ }
+
+ /* Replicate a MULTI request now that we are sure the block is executed.
+ * This way we'll deliver the MULTI/..../EXEC block as a whole and
+ * both the AOF and the replication link will have the same consistency
+ * and atomicity guarantees. */
+ execCommandReplicateMulti(c);
+
+ /* Exec all the queued commands */
+ unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
+ orig_argv = c->argv;
+ orig_argc = c->argc;
+ addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
+ for (j = 0; j < c->mstate.count; j++) {
+ c->argc = c->mstate.commands[j].argc;
+ c->argv = c->mstate.commands[j].argv;
+ call(c,c->mstate.commands[j].cmd);
+ }
+ c->argv = orig_argv;
+ c->argc = orig_argc;
+ freeClientMultiState(c);
+ initClientMultiState(c);
+ c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
+ /* Make sure the EXEC command is always replicated / AOF, since we
+ * always send the MULTI command (we can't know beforehand if the
+ * next operations will contain at least a modification to the DB). */
+ server.dirty++;
+}
+
+/* ===================== WATCH (CAS alike for MULTI/EXEC) ===================
+ *
+ * The implementation uses a per-DB hash table mapping keys to list of clients
+ * WATCHing those keys, so that given a key that is going to be modified
+ * we can mark all the associated clients as dirty.
+ *
+ * Also every client contains a list of WATCHed keys so that's possible to
+ * un-watch such keys when the client is freed or when UNWATCH is called. */
+
+/* In the client->watched_keys list we need to use watchedKey structures
+ * as in order to identify a key in Redis we need both the key name and the
+ * DB */
+typedef struct watchedKey {
+ robj *key;
+ redisDb *db;
+} watchedKey;
+
+/* Watch for the specified key */
+void watchForKey(redisClient *c, robj *key) {
+ list *clients = NULL;
+ listIter li;
+ listNode *ln;
+ watchedKey *wk;
+
+ /* Check if we are already watching for this key */
+ listRewind(c->watched_keys,&li);
+ while((ln = listNext(&li))) {
+ wk = listNodeValue(ln);
+ if (wk->db == c->db && equalStringObjects(key,wk->key))
+ return; /* Key already watched */
+ }
+ /* This key is not already watched in this DB. Let's add it */
+ clients = dictFetchValue(c->db->watched_keys,key);
+ if (!clients) {
+ clients = listCreate();
+ dictAdd(c->db->watched_keys,key,clients);
+ incrRefCount(key);
+ }
+ listAddNodeTail(clients,c);
+ /* Add the new key to the lits of keys watched by this client */
+ wk = zmalloc(sizeof(*wk));
+ wk->key = key;
+ wk->db = c->db;
+ incrRefCount(key);
+ listAddNodeTail(c->watched_keys,wk);
+}
+
+/* Unwatch all the keys watched by this client. To clean the EXEC dirty
+ * flag is up to the caller. */
+void unwatchAllKeys(redisClient *c) {
+ listIter li;
+ listNode *ln;
+
+ if (listLength(c->watched_keys) == 0) return;
+ listRewind(c->watched_keys,&li);
+ while((ln = listNext(&li))) {
+ list *clients;
+ watchedKey *wk;
+
+ /* Lookup the watched key -> clients list and remove the client
+ * from the list */
+ wk = listNodeValue(ln);
+ clients = dictFetchValue(wk->db->watched_keys, wk->key);
+ redisAssert(clients != NULL);
+ listDelNode(clients,listSearchKey(clients,c));
+ /* Kill the entry at all if this was the only client */
+ if (listLength(clients) == 0)
+ dictDelete(wk->db->watched_keys, wk->key);
+ /* Remove this watched key from the client->watched list */
+ listDelNode(c->watched_keys,ln);
+ decrRefCount(wk->key);
+ zfree(wk);
+ }
+}
+
+/* "Touch" a key, so that if this key is being WATCHed by some client the
+ * next EXEC will fail. */
+void touchWatchedKey(redisDb *db, robj *key) {
+ list *clients;
+ listIter li;
+ listNode *ln;
+
+ if (dictSize(db->watched_keys) == 0) return;
+ clients = dictFetchValue(db->watched_keys, key);
+ if (!clients) return;
+
+ /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
+ /* Check if we are already watching for this key */
+ listRewind(clients,&li);
+ while((ln = listNext(&li))) {
+ redisClient *c = listNodeValue(ln);
+
+ c->flags |= REDIS_DIRTY_CAS;
+ }
+}
+
+/* On FLUSHDB or FLUSHALL all the watched keys that are present before the
+ * flush but will be deleted as effect of the flushing operation should
+ * be touched. "dbid" is the DB that's getting the flush. -1 if it is