Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

8147 lines (7329 sloc) 282.097 kb
/*
* Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#define REDIS_VERSION "1.3.3"
#include "fmacros.h"
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#define __USE_POSIX199309
#include <signal.h>
#ifdef HAVE_BACKTRACE
#include <execinfo.h>
#include <ucontext.h>
#endif /* HAVE_BACKTRACE */
#include <sys/wait.h>
#include <errno.h>
#include <assert.h>
#include <ctype.h>
#include <stdarg.h>
#include <inttypes.h>
#include <arpa/inet.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/uio.h>
#include <limits.h>
#include <math.h>
#include <pthread.h>
#if defined(__sun)
#include "solarisfixes.h"
#endif
#include "redis.h"
#include "ae.h" /* Event driven programming library */
#include "sds.h" /* Dynamic safe strings */
#include "anet.h" /* Networking the easy way */
#include "dict.h" /* Hash tables */
#include "adlist.h" /* Linked lists */
#include "zmalloc.h" /* total memory usage aware version of malloc/free */
#include "lzf.h" /* LZF compression library */
#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
/* Error codes */
#define REDIS_OK 0
#define REDIS_ERR -1
/* Static server configuration */
#define REDIS_SERVERPORT 6379 /* TCP port */
#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
#define REDIS_IOBUF_LEN 1024
#define REDIS_LOADBUF_LEN 1024
#define REDIS_STATIC_ARGS 4
#define REDIS_DEFAULT_DBNUM 16
#define REDIS_CONFIGLINE_MAX 1024
#define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
#define REDIS_WRITEV_THRESHOLD 3
/* Max number of iovecs used for each writev call */
#define REDIS_WRITEV_IOVEC_COUNT 256
/* Hash table parameters */
#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
/* Command flags */
#define REDIS_CMD_BULK 1 /* Bulk write command */
#define REDIS_CMD_INLINE 2 /* Inline command */
/* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
this flags will return an error when the 'maxmemory' option is set in the
config file and the server is using more than maxmemory bytes of memory.
In short this commands are denied on low memory conditions. */
#define REDIS_CMD_DENYOOM 4
/* Object types */
#define REDIS_STRING 0
#define REDIS_LIST 1
#define REDIS_SET 2
#define REDIS_ZSET 3
#define REDIS_HASH 4
/* Objects encoding */
#define REDIS_ENCODING_RAW 0 /* Raw representation */
#define REDIS_ENCODING_INT 1 /* Encoded as integer */
/* Object types only used for dumping to disk */
#define REDIS_EXPIRETIME 253
#define REDIS_SELECTDB 254
#define REDIS_EOF 255
/* Defines related to the dump file format. To store 32 bits lengths for short
* keys requires a lot of space, so we check the most significant 2 bits of
* the first byte to interpreter the length:
*
* 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
* 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
* 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
* 11|000000 this means: specially encoded object will follow. The six bits
* number specify the kind of object that follows.
* See the REDIS_RDB_ENC_* defines.
*
* Lenghts up to 63 are stored using a single byte, most DB keys, and may
* values, will fit inside. */
#define REDIS_RDB_6BITLEN 0
#define REDIS_RDB_14BITLEN 1
#define REDIS_RDB_32BITLEN 2
#define REDIS_RDB_ENCVAL 3
#define REDIS_RDB_LENERR UINT_MAX
/* When a length of a string object stored on disk has the first two bits
* set, the remaining two bits specify a special encoding for the object
* accordingly to the following defines: */
#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
/* Virtual memory object->where field. */
#define REDIS_VM_MEMORY 0 /* The object is on memory */
#define REDIS_VM_SWAPPED 1 /* The object is on disk */
#define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
#define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
/* Virtual memory static configuration stuff.
* Check vmFindContiguousPages() to know more about this magic numbers. */
#define REDIS_VM_MAX_NEAR_PAGES 65536
#define REDIS_VM_MAX_RANDOM_JUMP 4096
#define REDIS_VM_MAX_THREADS 32
#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
/* The following is the *percentage* of completed I/O jobs to process when the
* handelr is called. While Virtual Memory I/O operations are performed by
* threads, this operations must be processed by the main thread when completed
* in order to take effect. */
#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
/* Client flags */
#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
#define REDIS_SLAVE 2 /* This client is a slave server */
#define REDIS_MASTER 4 /* This client is a master server */
#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
#define REDIS_MULTI 16 /* This client is in a MULTI context */
#define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
#define REDIS_IO_WAIT 64 /* The client is waiting for Virtual Memory I/O */
/* Slave replication state - slave side */
#define REDIS_REPL_NONE 0 /* No active replication */
#define REDIS_REPL_CONNECT 1 /* Must connect to master */
#define REDIS_REPL_CONNECTED 2 /* Connected to master */
/* Slave replication state - from the point of view of master
* Note that in SEND_BULK and ONLINE state the slave receives new updates
* in its output queue. In the WAIT_BGSAVE state instead the server is waiting
* to start the next background saving in order to send updates to it. */
#define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
#define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
#define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
#define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
/* List related stuff */
#define REDIS_HEAD 0
#define REDIS_TAIL 1
/* Sort operations */
#define REDIS_SORT_GET 0
#define REDIS_SORT_ASC 1
#define REDIS_SORT_DESC 2
#define REDIS_SORTKEY_MAX 1024
/* Log levels */
#define REDIS_DEBUG 0
#define REDIS_VERBOSE 1
#define REDIS_NOTICE 2
#define REDIS_WARNING 3
/* Anti-warning macro... */
#define REDIS_NOTUSED(V) ((void) V)
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
/* Append only defines */
#define APPENDFSYNC_NO 0
#define APPENDFSYNC_ALWAYS 1
#define APPENDFSYNC_EVERYSEC 2
/* We can print the stacktrace, so our assert is defined this way: */
#define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),exit(1)))
static void _redisAssert(char *estr, char *file, int line);
/*================================= Data types ============================== */
/* A redis object, that is a type able to hold a string / list / set */
/* The VM object structure */
struct redisObjectVM {
off_t page; /* the page at witch the object is stored on disk */
off_t usedpages; /* number of pages used on disk */
time_t atime; /* Last access time */
} vm;
/* The actual Redis Object */
typedef struct redisObject {
void *ptr;
unsigned char type;
unsigned char encoding;
unsigned char storage; /* If this object is a key, where is the value?
* REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
unsigned char vtype; /* If this object is a key, and value is swapped out,
* this is the type of the swapped out object. */
int refcount;
/* VM fields, this are only allocated if VM is active, otherwise the
* object allocation function will just allocate
* sizeof(redisObjct) minus sizeof(redisObjectVM), so using
* Redis without VM active will not have any overhead. */
struct redisObjectVM vm;
} robj;
/* Macro used to initalize a Redis object allocated on the stack.
* Note that this macro is taken near the structure definition to make sure
* we'll update it when the structure is changed, to avoid bugs like
* bug #85 introduced exactly in this way. */
#define initStaticStringObject(_var,_ptr) do { \
_var.refcount = 1; \
_var.type = REDIS_STRING; \
_var.encoding = REDIS_ENCODING_RAW; \
_var.ptr = _ptr; \
if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \
} while(0);
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
int id;
} redisDb;
/* Client MULTI/EXEC state */
typedef struct multiCmd {
robj **argv;
int argc;
struct redisCommand *cmd;
} multiCmd;
typedef struct multiState {
multiCmd *commands; /* Array of MULTI commands */
int count; /* Total number of MULTI commands */
} multiState;
/* With multiplexing we need to take per-clinet state.
* Clients are taken in a liked list. */
typedef struct redisClient {
int fd;
redisDb *db;
int dictid;
sds querybuf;
robj **argv, **mbargv;
int argc, mbargc;
int bulklen; /* bulk read len. -1 if not in bulk read mode */
int multibulk; /* multi bulk command format active */
list *reply;
int sentlen;
time_t lastinteraction; /* time of the last interaction, used for timeout */
int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
/* REDIS_MULTI */
int slaveseldb; /* slave selected db, if this client is a slave */
int authenticated; /* when requirepass is non-NULL */
int replstate; /* replication state if this is a slave */
int repldbfd; /* replication DB file descriptor */
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
multiState mstate; /* MULTI/EXEC state */
robj **blockingkeys; /* The key we waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
int blockingkeysnum; /* Number of blocking keys */
time_t blockingto; /* Blocking operation timeout. If UNIX current time
* is >= blockingto then the operation timed out. */
list *io_keys; /* Keys this client is waiting to be loaded from the
* swap file in order to continue. */
} redisClient;
struct saveparam {
time_t seconds;
int changes;
};
/* Global server state structure */
struct redisServer {
int port;
int fd;
redisDb *db;
dict *sharingpool; /* Poll used for object sharing */
unsigned int sharingpoolsize;
long long dirty; /* changes to DB from the last save */
list *clients;
list *slaves, *monitors;
char neterr[ANET_ERR_LEN];
aeEventLoop *el;
int cronloops; /* number of times the cron function run */
list *objfreelist; /* A list of freed objects to avoid malloc() */
time_t lastsave; /* Unix time of last save succeeede */
/* Fields used only for stats */
time_t stat_starttime; /* server start time */
long long stat_numcommands; /* number of processed commands */
long long stat_numconnections; /* number of connections received */
/* Configuration */
int verbosity;
int glueoutputbuf;
int maxidletime;
int dbnum;
int daemonize;
int appendonly;
int appendfsync;
time_t lastfsync;
int appendfd;
int appendseldb;
char *pidfile;
pid_t bgsavechildpid;
pid_t bgrewritechildpid;
sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
struct saveparam *saveparams;
int saveparamslen;
char *logfile;
char *bindaddr;
char *dbfilename;
char *appendfilename;
char *requirepass;
int shareobjects;
int rdbcompression;
/* Replication related */
int isslave;
char *masterauth;
char *masterhost;
int masterport;
redisClient *master; /* client that is master for this slave */
int replstate;
unsigned int maxclients;
unsigned long long maxmemory;
unsigned int blockedclients;
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
int sort_alpha;
int sort_bypattern;
/* Virtual memory configuration */
int vm_enabled;
char *vm_swap_file;
off_t vm_page_size;
off_t vm_pages;
unsigned long long vm_max_memory;
/* Virtual memory state */
FILE *vm_fp;
int vm_fd;
off_t vm_next_page; /* Next probably empty page */
off_t vm_near_pages; /* Number of pages allocated sequentially */
unsigned char *vm_bitmap; /* Bitmap of free/used pages */
time_t unixtime; /* Unix time sampled every second. */
/* Virtual memory I/O threads stuff */
/* An I/O thread process an element taken from the io_jobs queue and
* put the result of the operation in the io_done list. While the
* job is being processed, it's put on io_processing queue. */
list *io_newjobs; /* List of VM I/O jobs yet to be processed */
list *io_processing; /* List of VM I/O jobs being processed */
list *io_processed; /* List of VM I/O jobs already processed */
list *io_clients; /* All the clients waiting for SWAP I/O operations */
pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
pthread_mutex_t obj_freelist_mutex; /* safe redis objects creation/free */
pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
pthread_attr_t io_threads_attr; /* attributes for threads creation */
int io_active_threads; /* Number of running I/O threads */
int vm_max_threads; /* Max number of I/O threads running at the same time */
/* Our main thread is blocked on the event loop, locking for sockets ready
* to be read or written, so when a threaded I/O operation is ready to be
* processed by the main thread, the I/O thread will use a unix pipe to
* awake the main thread. The followings are the two pipe FDs. */
int io_ready_pipe_read;
int io_ready_pipe_write;
/* Virtual memory stats */
unsigned long long vm_stats_used_pages;
unsigned long long vm_stats_swapped_objects;
unsigned long long vm_stats_swapouts;
unsigned long long vm_stats_swapins;
FILE *devnull;
};
typedef void redisCommandProc(redisClient *c);
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
int flags;
};
struct redisFunctionSym {
char *name;
unsigned long pointer;
};
typedef struct _redisSortObject {
robj *obj;
union {
double score;
robj *cmpobj;
} u;
} redisSortObject;
typedef struct _redisSortOperation {
int type;
robj *pattern;
} redisSortOperation;
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
struct zskiplistNode **forward;
struct zskiplistNode *backward;
double score;
robj *obj;
} zskiplistNode;
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
/* Our shared "common" objects */
struct sharedObjectsStruct {
robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
*colon, *nullbulk, *nullmultibulk, *queued,
*emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
*outofrangeerr, *plus,
*select0, *select1, *select2, *select3, *select4,
*select5, *select6, *select7, *select8, *select9;
} shared;
/* Global vars that are actally used as constants. The following double
* values are used for double on-disk serialization, and are initialized
* at runtime to avoid strange compiler optimizations. */
static double R_Zero, R_PosInf, R_NegInf, R_Nan;
/* VM threaded I/O request message */
#define REDIS_IOJOB_LOAD 0 /* Load from disk to memory */
#define REDIS_IOJOB_PREPARE_SWAP 1 /* Compute needed pages */
#define REDIS_IOJOB_DO_SWAP 2 /* Swap from memory to disk */
typedef struct iojon {
int type; /* Request type, REDIS_IOJOB_* */
redisDb *db;/* Redis database */
robj *key; /* This I/O request is about swapping this key */
robj *val; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
* field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
off_t page; /* Swap page where to read/write the object */
off_t pages; /* Swap pages needed to safe object. PREPARE_SWAP return val */
int canceled; /* True if this command was canceled by blocking side of VM */
pthread_t thread; /* ID of the thread processing this entry */
} iojob;
/*================================ Prototypes =============================== */
static void freeStringObject(robj *o);
static void freeListObject(robj *o);
static void freeSetObject(robj *o);
static void decrRefCount(void *o);
static robj *createObject(int type, void *ptr);
static void freeClient(redisClient *c);
static int rdbLoad(char *filename);
static void addReply(redisClient *c, robj *obj);
static void addReplySds(redisClient *c, sds s);
static void incrRefCount(robj *o);
static int rdbSaveBackground(char *filename);
static robj *createStringObject(char *ptr, size_t len);
static robj *dupStringObject(robj *o);
static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
static int syncWithMaster(void);
static robj *tryObjectSharing(robj *o);
static int tryObjectEncoding(robj *o);
static robj *getDecodedObject(robj *o);
static int removeExpire(redisDb *db, robj *key);
static int expireIfNeeded(redisDb *db, robj *key);
static int deleteIfVolatile(redisDb *db, robj *key);
static int deleteIfSwapped(redisDb *db, robj *key);
static int deleteKey(redisDb *db, robj *key);
static time_t getExpire(redisDb *db, robj *key);
static int setExpire(redisDb *db, robj *key, time_t when);
static void updateSlavesWaitingBgsave(int bgsaveerr);
static void freeMemoryIfNeeded(void);
static int processCommand(redisClient *c);
static void setupSigSegvAction(void);
static void rdbRemoveTempFile(pid_t childpid);
static void aofRemoveTempFile(pid_t childpid);
static size_t stringObjectLen(robj *o);
static void processInputBuffer(redisClient *c);
static zskiplist *zslCreate(void);
static void zslFree(zskiplist *zsl);
static void zslInsert(zskiplist *zsl, double score, robj *obj);
static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
static void initClientMultiState(redisClient *c);
static void freeClientMultiState(redisClient *c);
static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
static void unblockClientWaitingData(redisClient *c);
static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
static void vmInit(void);
static void vmMarkPagesFree(off_t page, off_t count);
static robj *vmLoadObject(robj *key);
static robj *vmPreviewObject(robj *key);
static int vmSwapOneObjectBlocking(void);
static int vmSwapOneObjectThreaded(void);
static int vmCanSwapOut(void);
static int tryFreeOneObjectFromFreelist(void);
static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask);
static void vmCancelThreadedIOJob(robj *o);
static void lockThreadedIO(void);
static void unlockThreadedIO(void);
static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db);
static void freeIOJob(iojob *j);
static void queueIOJob(iojob *j);
static int vmWriteObjectOnSwap(robj *o, off_t page);
static robj *vmReadObjectFromSwap(off_t page, int type);
static void waitEmptyIOJobsQueue(void);
static void vmReopenSwapFile(void);
static int vmFreePage(off_t page);
static void authCommand(redisClient *c);
static void pingCommand(redisClient *c);
static void echoCommand(redisClient *c);
static void setCommand(redisClient *c);
static void setnxCommand(redisClient *c);
static void getCommand(redisClient *c);
static void delCommand(redisClient *c);
static void existsCommand(redisClient *c);
static void incrCommand(redisClient *c);
static void decrCommand(redisClient *c);
static void incrbyCommand(redisClient *c);
static void decrbyCommand(redisClient *c);
static void selectCommand(redisClient *c);
static void randomkeyCommand(redisClient *c);
static void keysCommand(redisClient *c);
static void dbsizeCommand(redisClient *c);
static void lastsaveCommand(redisClient *c);
static void saveCommand(redisClient *c);
static void bgsaveCommand(redisClient *c);
static void bgrewriteaofCommand(redisClient *c);
static void shutdownCommand(redisClient *c);
static void moveCommand(redisClient *c);
static void renameCommand(redisClient *c);
static void renamenxCommand(redisClient *c);
static void lpushCommand(redisClient *c);
static void rpushCommand(redisClient *c);
static void lpopCommand(redisClient *c);
static void rpopCommand(redisClient *c);
static void llenCommand(redisClient *c);
static void lindexCommand(redisClient *c);
static void lrangeCommand(redisClient *c);
static void ltrimCommand(redisClient *c);
static void typeCommand(redisClient *c);
static void lsetCommand(redisClient *c);
static void saddCommand(redisClient *c);
static void sremCommand(redisClient *c);
static void smoveCommand(redisClient *c);
static void sismemberCommand(redisClient *c);
static void scardCommand(redisClient *c);
static void spopCommand(redisClient *c);
static void srandmemberCommand(redisClient *c);
static void sinterCommand(redisClient *c);
static void sinterstoreCommand(redisClient *c);
static void sunionCommand(redisClient *c);
static void sunionstoreCommand(redisClient *c);
static void sdiffCommand(redisClient *c);
static void sdiffstoreCommand(redisClient *c);
static void syncCommand(redisClient *c);
static void flushdbCommand(redisClient *c);
static void flushallCommand(redisClient *c);
static void sortCommand(redisClient *c);
static void lremCommand(redisClient *c);
static void rpoplpushcommand(redisClient *c);
static void infoCommand(redisClient *c);
static void mgetCommand(redisClient *c);
static void monitorCommand(redisClient *c);
static void expireCommand(redisClient *c);
static void expireatCommand(redisClient *c);
static void getsetCommand(redisClient *c);
static void ttlCommand(redisClient *c);
static void slaveofCommand(redisClient *c);
static void debugCommand(redisClient *c);
static void msetCommand(redisClient *c);
static void msetnxCommand(redisClient *c);
static void zaddCommand(redisClient *c);
static void zincrbyCommand(redisClient *c);
static void zrangeCommand(redisClient *c);
static void zrangebyscoreCommand(redisClient *c);
static void zrevrangeCommand(redisClient *c);
static void zcardCommand(redisClient *c);
static void zremCommand(redisClient *c);
static void zscoreCommand(redisClient *c);
static void zremrangebyscoreCommand(redisClient *c);
static void multiCommand(redisClient *c);
static void execCommand(redisClient *c);
static void blpopCommand(redisClient *c);
static void brpopCommand(redisClient *c);
/*================================= Globals ================================= */
/* Global vars */
static struct redisServer server; /* server global state */
static struct redisCommand cmdTable[] = {
{"get",getCommand,2,REDIS_CMD_INLINE},
{"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"del",delCommand,-2,REDIS_CMD_INLINE},
{"exists",existsCommand,2,REDIS_CMD_INLINE},
{"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
{"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
{"mget",mgetCommand,-2,REDIS_CMD_INLINE},
{"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"rpop",rpopCommand,2,REDIS_CMD_INLINE},
{"lpop",lpopCommand,2,REDIS_CMD_INLINE},
{"brpop",brpopCommand,-3,REDIS_CMD_INLINE},
{"blpop",blpopCommand,-3,REDIS_CMD_INLINE},
{"llen",llenCommand,2,REDIS_CMD_INLINE},
{"lindex",lindexCommand,3,REDIS_CMD_INLINE},
{"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
{"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
{"lrem",lremCommand,4,REDIS_CMD_BULK},
{"rpoplpush",rpoplpushcommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
{"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"srem",sremCommand,3,REDIS_CMD_BULK},
{"smove",smoveCommand,4,REDIS_CMD_BULK},
{"sismember",sismemberCommand,3,REDIS_CMD_BULK},
{"scard",scardCommand,2,REDIS_CMD_INLINE},
{"spop",spopCommand,2,REDIS_CMD_INLINE},
{"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
{"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
{"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
{"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
{"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
{"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
{"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
{"smembers",sinterCommand,2,REDIS_CMD_INLINE},
{"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"zrem",zremCommand,3,REDIS_CMD_BULK},
{"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
{"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
{"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
{"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
{"zcard",zcardCommand,2,REDIS_CMD_INLINE},
{"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
{"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
{"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
{"select",selectCommand,2,REDIS_CMD_INLINE},
{"move",moveCommand,3,REDIS_CMD_INLINE},
{"rename",renameCommand,3,REDIS_CMD_INLINE},
{"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
{"expire",expireCommand,3,REDIS_CMD_INLINE},
{"expireat",expireatCommand,3,REDIS_CMD_INLINE},
{"keys",keysCommand,2,REDIS_CMD_INLINE},
{"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
{"auth",authCommand,2,REDIS_CMD_INLINE},
{"ping",pingCommand,1,REDIS_CMD_INLINE},
{"echo",echoCommand,2,REDIS_CMD_BULK},
{"save",saveCommand,1,REDIS_CMD_INLINE},
{"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
{"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
{"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
{"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
{"type",typeCommand,2,REDIS_CMD_INLINE},
{"multi",multiCommand,1,REDIS_CMD_INLINE},
{"exec",execCommand,1,REDIS_CMD_INLINE},
{"sync",syncCommand,1,REDIS_CMD_INLINE},
{"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
{"flushall",flushallCommand,1,REDIS_CMD_INLINE},
{"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
{"info",infoCommand,1,REDIS_CMD_INLINE},
{"monitor",monitorCommand,1,REDIS_CMD_INLINE},
{"ttl",ttlCommand,2,REDIS_CMD_INLINE},
{"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
{"debug",debugCommand,-2,REDIS_CMD_INLINE},
{NULL,NULL,0,0}
};
/*============================ Utility functions ============================ */
/* Glob-style pattern matching. */
int stringmatchlen(const char *pattern, int patternLen,
const char *string, int stringLen, int nocase)
{
while(patternLen) {
switch(pattern[0]) {
case '*':
while (pattern[1] == '*') {
pattern++;
patternLen--;
}
if (patternLen == 1)
return 1; /* match */
while(stringLen) {
if (stringmatchlen(pattern+1, patternLen-1,
string, stringLen, nocase))
return 1; /* match */
string++;
stringLen--;
}
return 0; /* no match */
break;
case '?':
if (stringLen == 0)
return 0; /* no match */
string++;
stringLen--;
break;
case '[':
{
int not, match;
pattern++;
patternLen--;
not = pattern[0] == '^';
if (not) {
pattern++;
patternLen--;
}
match = 0;
while(1) {
if (pattern[0] == '\\') {
pattern++;
patternLen--;
if (pattern[0] == string[0])
match = 1;
} else if (pattern[0] == ']') {
break;
} else if (patternLen == 0) {
pattern--;
patternLen++;
break;
} else if (pattern[1] == '-' && patternLen >= 3) {
int start = pattern[0];
int end = pattern[2];
int c = string[0];
if (start > end) {
int t = start;
start = end;
end = t;
}
if (nocase) {
start = tolower(start);
end = tolower(end);
c = tolower(c);
}
pattern += 2;
patternLen -= 2;
if (c >= start && c <= end)
match = 1;
} else {
if (!nocase) {
if (pattern[0] == string[0])
match = 1;
} else {
if (tolower((int)pattern[0]) == tolower((int)string[0]))
match = 1;
}
}
pattern++;
patternLen--;
}
if (not)
match = !match;
if (!match)
return 0; /* no match */
string++;
stringLen--;
break;
}
case '\\':
if (patternLen >= 2) {
pattern++;
patternLen--;
}
/* fall through */
default:
if (!nocase) {
if (pattern[0] != string[0])
return 0; /* no match */
} else {
if (tolower((int)pattern[0]) != tolower((int)string[0]))
return 0; /* no match */
}
string++;
stringLen--;
break;
}
pattern++;
patternLen--;
if (stringLen == 0) {
while(*pattern == '*') {
pattern++;
patternLen--;
}
break;
}
}
if (patternLen == 0 && stringLen == 0)
return 1;
return 0;
}
static void redisLog(int level, const char *fmt, ...) {
va_list ap;
FILE *fp;
fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
if (!fp) return;
va_start(ap, fmt);
if (level >= server.verbosity) {
char *c = ".-*";
char buf[64];
time_t now;
now = time(NULL);
strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
fprintf(fp,"[%d] %s %c ",(int)getpid(),buf,c[level]);
vfprintf(fp, fmt, ap);
fprintf(fp,"\n");
fflush(fp);
}
va_end(ap);
if (server.logfile) fclose(fp);
}
/*====================== Hash table type implementation ==================== */
/* This is an hash table type that uses the SDS dynamic strings libary as
* keys and radis objects as values (objects can hold SDS strings,
* lists, sets). */
static void dictVanillaFree(void *privdata, void *val)
{
DICT_NOTUSED(privdata);
zfree(val);
}
static void dictListDestructor(void *privdata, void *val)
{
DICT_NOTUSED(privdata);
listRelease((list*)val);
}
static int sdsDictKeyCompare(void *privdata, const void *key1,
const void *key2)
{
int l1,l2;
DICT_NOTUSED(privdata);
l1 = sdslen((sds)key1);
l2 = sdslen((sds)key2);
if (l1 != l2) return 0;
return memcmp(key1, key2, l1) == 0;
}
static void dictRedisObjectDestructor(void *privdata, void *val)
{
DICT_NOTUSED(privdata);
if (val == NULL) return; /* Values of swapped out keys as set to NULL */
decrRefCount(val);
}
static int dictObjKeyCompare(void *privdata, const void *key1,
const void *key2)
{
const robj *o1 = key1, *o2 = key2;
return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
}
static unsigned int dictObjHash(const void *key) {
const robj *o = key;
return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
}
static int dictEncObjKeyCompare(void *privdata, const void *key1,
const void *key2)
{
robj *o1 = (robj*) key1, *o2 = (robj*) key2;
int cmp;
o1 = getDecodedObject(o1);
o2 = getDecodedObject(o2);
cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
decrRefCount(o1);
decrRefCount(o2);
return cmp;
}
static unsigned int dictEncObjHash(const void *key) {
robj *o = (robj*) key;
o = getDecodedObject(o);
unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
decrRefCount(o);
return hash;
}
/* Sets type and expires */
static dictType setDictType = {
dictEncObjHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictEncObjKeyCompare, /* key compare */
dictRedisObjectDestructor, /* key destructor */
NULL /* val destructor */
};
/* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
static dictType zsetDictType = {
dictEncObjHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictEncObjKeyCompare, /* key compare */
dictRedisObjectDestructor, /* key destructor */
dictVanillaFree /* val destructor of malloc(sizeof(double)) */
};
/* Db->dict */
static dictType hashDictType = {
dictObjHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictObjKeyCompare, /* key compare */
dictRedisObjectDestructor, /* key destructor */
dictRedisObjectDestructor /* val destructor */
};
/* Db->expires */
static dictType keyptrDictType = {
dictObjHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictObjKeyCompare, /* key compare */
dictRedisObjectDestructor, /* key destructor */
NULL /* val destructor */
};
/* Keylist hash table type has unencoded redis objects as keys and
* lists as values. It's used for blocking operations (BLPOP) */
static dictType keylistDictType = {
dictObjHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictObjKeyCompare, /* key compare */
dictRedisObjectDestructor, /* key destructor */
dictListDestructor /* val destructor */
};
/* ========================= Random utility functions ======================= */
/* Redis generally does not try to recover from out of memory conditions
* when allocating objects or strings, it is not clear if it will be possible
* to report this condition to the client since the networking layer itself
* is based on heap allocation for send buffers, so we simply abort.
* At least the code will be simpler to read... */
static void oom(const char *msg) {
redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
sleep(1);
abort();
}
/* ====================== Redis server networking stuff ===================== */
static void closeTimedoutClients(void) {
redisClient *c;
listNode *ln;
time_t now = time(NULL);
listIter li;
listRewind(server.clients,&li);
while ((ln = listNext(&li)) != NULL) {
c = listNodeValue(ln);
if (server.maxidletime &&
!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
!(c->flags & REDIS_MASTER) && /* no timeout for masters */
(now - c->lastinteraction > server.maxidletime))
{
redisLog(REDIS_VERBOSE,"Closing idle client");
freeClient(c);
} else if (c->flags & REDIS_BLOCKED) {
if (c->blockingto != 0 && c->blockingto < now) {
addReply(c,shared.nullmultibulk);
unblockClientWaitingData(c);
}
}
}
}
static int htNeedsResize(dict *dict) {
long long size, used;
size = dictSlots(dict);
used = dictSize(dict);
return (size && used && size > DICT_HT_INITIAL_SIZE &&
(used*100/size < REDIS_HT_MINFILL));
}
/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
* we resize the hash table to save memory */
static void tryResizeHashTables(void) {
int j;
for (j = 0; j < server.dbnum; j++) {
if (htNeedsResize(server.db[j].dict)) {
redisLog(REDIS_VERBOSE,"The hash table %d is too sparse, resize it...",j);
dictResize(server.db[j].dict);
redisLog(REDIS_VERBOSE,"Hash table %d resized.",j);
}
if (htNeedsResize(server.db[j].expires))
dictResize(server.db[j].expires);
}
}
/* A background saving child (BGSAVE) terminated its work. Handle this. */
void backgroundSaveDoneHandler(int statloc) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = WIFSIGNALED(statloc);
if (!bysignal && exitcode == 0) {
redisLog(REDIS_NOTICE,
"Background saving terminated with success");
server.dirty = 0;
server.lastsave = time(NULL);
} else if (!bysignal && exitcode != 0) {
redisLog(REDIS_WARNING, "Background saving error");
} else {
redisLog(REDIS_WARNING,
"Background saving terminated by signal");
rdbRemoveTempFile(server.bgsavechildpid);
}
server.bgsavechildpid = -1;
/* Possibly there are slaves waiting for a BGSAVE in order to be served
* (the first stage of SYNC is a bulk transfer of dump.rdb) */
updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
* Handle this. */
void backgroundRewriteDoneHandler(int statloc) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = WIFSIGNALED(statloc);
if (!bysignal && exitcode == 0) {
int fd;
char tmpfile[256];
redisLog(REDIS_NOTICE,
"Background append only file rewriting terminated with success");
/* Now it's time to flush the differences accumulated by the parent */
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
fd = open(tmpfile,O_WRONLY|O_APPEND);
if (fd == -1) {
redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
goto cleanup;
}
/* Flush our data... */
if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
(signed) sdslen(server.bgrewritebuf)) {
redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
close(fd);
goto cleanup;
}
redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
/* Now our work is to rename the temp file into the stable file. And
* switch the file descriptor used by the server for append only. */
if (rename(tmpfile,server.appendfilename) == -1) {
redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
close(fd);
goto cleanup;
}
/* Mission completed... almost */
redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
if (server.appendfd != -1) {
/* If append only is actually enabled... */
close(server.appendfd);
server.appendfd = fd;
fsync(fd);
server.appendseldb = -1; /* Make sure it will issue SELECT */
redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
} else {
/* If append only is disabled we just generate a dump in this
* format. Why not? */
close(fd);
}
} else if (!bysignal && exitcode != 0) {
redisLog(REDIS_WARNING, "Background append only file rewriting error");
} else {
redisLog(REDIS_WARNING,
"Background append only file rewriting terminated by signal");
}
cleanup:
sdsfree(server.bgrewritebuf);
server.bgrewritebuf = sdsempty();
aofRemoveTempFile(server.bgrewritechildpid);
server.bgrewritechildpid = -1;
}
static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j, loops = server.cronloops++;
REDIS_NOTUSED(eventLoop);
REDIS_NOTUSED(id);
REDIS_NOTUSED(clientData);
/* We take a cached value of the unix time in the global state because
* with virtual memory and aging there is to store the current time
* in objects at every object access, and accuracy is not needed.
* To access a global var is faster than calling time(NULL) */
server.unixtime = time(NULL);
/* Show some info about non-empty databases */
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
size = dictSlots(server.db[j].dict);
used = dictSize(server.db[j].dict);
vkeys = dictSize(server.db[j].expires);
if (!(loops % 5) && (used || vkeys)) {
redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(server.dict); */
}
}
/* We don't want to resize the hash tables while a bacground saving
* is in progress: the saving child is created using fork() that is
* implemented with a copy-on-write semantic in most modern systems, so
* if we resize the HT while there is the saving child at work actually
* a lot of memory movements in the parent will cause a lot of pages
* copied. */
if (server.bgsavechildpid == -1) tryResizeHashTables();
/* Show information about connected clients */
if (!(loops % 5)) {
redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
zmalloc_used_memory(),
dictSize(server.sharingpool));
}
/* Close connections of timedout clients */
if ((server.maxidletime && !(loops % 10)) || server.blockedclients)
closeTimedoutClients();
/* Check if a background saving or AOF rewrite in progress terminated */
if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
int statloc;
pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
if (pid == server.bgsavechildpid) {
backgroundSaveDoneHandler(statloc);
} else {
backgroundRewriteDoneHandler(statloc);
}
}
} else {
/* If there is not a background saving in progress check if
* we have to save now */
time_t now = time(NULL);
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
if (server.dirty >= sp->changes &&
now-server.lastsave > sp->seconds) {
redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, sp->seconds);
rdbSaveBackground(server.dbfilename);
break;
}
}
}
/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by
* keys that can be removed from the keyspace. */
for (j = 0; j < server.dbnum; j++) {
int expired;
redisDb *db = server.db+j;
/* Continue to expire if at the end of the cycle more than 25%
* of the keys were expired. */
do {
long num = dictSize(db->expires);
time_t now = time(NULL);
expired = 0;
if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
num = REDIS_EXPIRELOOKUPS_PER_CRON;
while (num--) {
dictEntry *de;
time_t t;
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
t = (time_t) dictGetEntryVal(de);
if (now > t) {
deleteKey(db,dictGetEntryKey(de));
expired++;
}
}
} while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
}
/* Swap a few keys on disk if we are over the memory limit and VM
* is enbled. Try to free objects from the free list first. */
if (vmCanSwapOut()) {
while (server.vm_enabled && zmalloc_used_memory() >
server.vm_max_memory)
{
int retval;
if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
retval = (server.vm_max_threads == 0) ?
vmSwapOneObjectBlocking() :
vmSwapOneObjectThreaded();
if (retval == REDIS_ERR && (loops % 30) == 0 &&
zmalloc_used_memory() >
(server.vm_max_memory+server.vm_max_memory/10))
{
redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
}
/* Note that when using threade I/O we free just one object,
* because anyway when the I/O thread in charge to swap this
* object out will finish, the handler of completed jobs
* will try to swap more objects if we are still out of memory. */
if (retval == REDIS_ERR || server.vm_max_threads > 0) break;
}
}
/* Check if we should connect to a MASTER */
if (server.replstate == REDIS_REPL_CONNECT) {
redisLog(REDIS_NOTICE,"Connecting to MASTER...");
if (syncWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
}
}
return 1000;
}
static void createSharedObjects(void) {
shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
"-ERR Operation against a key holding the wrong kind of value\r\n"));
shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
"-ERR no such key\r\n"));
shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
"-ERR syntax error\r\n"));
shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
"-ERR source and destination objects are the same\r\n"));
shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
"-ERR index out of range\r\n"));
shared.space = createObject(REDIS_STRING,sdsnew(" "));
shared.colon = createObject(REDIS_STRING,sdsnew(":"));
shared.plus = createObject(REDIS_STRING,sdsnew("+"));
shared.select0 = createStringObject("select 0\r\n",10);
shared.select1 = createStringObject("select 1\r\n",10);
shared.select2 = createStringObject("select 2\r\n",10);
shared.select3 = createStringObject("select 3\r\n",10);
shared.select4 = createStringObject("select 4\r\n",10);
shared.select5 = createStringObject("select 5\r\n",10);
shared.select6 = createStringObject("select 6\r\n",10);
shared.select7 = createStringObject("select 7\r\n",10);
shared.select8 = createStringObject("select 8\r\n",10);
shared.select9 = createStringObject("select 9\r\n",10);
}
static void appendServerSaveParams(time_t seconds, int changes) {
server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
server.saveparams[server.saveparamslen].seconds = seconds;
server.saveparams[server.saveparamslen].changes = changes;
server.saveparamslen++;
}
static void resetServerSaveParams() {
zfree(server.saveparams);
server.saveparams = NULL;
server.saveparamslen = 0;
}
static void initServerConfig() {
server.dbnum = REDIS_DEFAULT_DBNUM;
server.port = REDIS_SERVERPORT;
server.verbosity = REDIS_VERBOSE;
server.maxidletime = REDIS_MAXIDLETIME;
server.saveparams = NULL;
server.logfile = NULL; /* NULL = log on standard output */
server.bindaddr = NULL;
server.glueoutputbuf = 1;
server.daemonize = 0;
server.appendonly = 0;
server.appendfsync = APPENDFSYNC_ALWAYS;
server.lastfsync = time(NULL);
server.appendfd = -1;
server.appendseldb = -1; /* Make sure the first time will not match */
server.pidfile = "/var/run/redis.pid";
server.dbfilename = "dump.rdb";
server.appendfilename = "appendonly.aof";
server.requirepass = NULL;
server.shareobjects = 0;
server.rdbcompression = 1;
server.sharingpoolsize = 1024;
server.maxclients = 0;
server.blockedclients = 0;
server.maxmemory = 0;
server.vm_enabled = 0;
server.vm_swap_file = zstrdup("/tmp/redis-%p.vm");
server.vm_page_size = 256; /* 256 bytes per page */
server.vm_pages = 1024*1024*100; /* 104 millions of pages */
server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
server.vm_max_threads = 4;
resetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
/* Replication related */
server.isslave = 0;
server.masterauth = NULL;
server.masterhost = NULL;
server.masterport = 6379;
server.master = NULL;
server.replstate = REDIS_REPL_NONE;
/* Double constants initialization */
R_Zero = 0.0;
R_PosInf = 1.0/R_Zero;
R_NegInf = -1.0/R_Zero;
R_Nan = R_Zero/R_Zero;
}
static void initServer() {
int j;
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSigSegvAction();
server.devnull = fopen("/dev/null","w");
if (server.devnull == NULL) {
redisLog(REDIS_WARNING, "Can't open /dev/null: %s", server.neterr);
exit(1);
}
server.clients = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.objfreelist = listCreate();
createSharedObjects();
server.el = aeCreateEventLoop();
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
server.sharingpool = dictCreate(&setDictType,NULL);
server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
if (server.fd == -1) {
redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
exit(1);
}
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&hashDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
}
server.cronloops = 0;
server.bgsavechildpid = -1;
server.bgrewritechildpid = -1;
server.bgrewritebuf = sdsempty();
server.lastsave = time(NULL);
server.dirty = 0;
server.stat_numcommands = 0;
server.stat_numconnections = 0;
server.stat_starttime = time(NULL);
server.unixtime = time(NULL);
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
acceptHandler, NULL) == AE_ERR) oom("creating file event");
if (server.appendonly) {
server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
if (server.appendfd == -1) {
redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
strerror(errno));
exit(1);
}
}
if (server.vm_enabled) vmInit();
}
/* Empty the whole database */
static long long emptyDb() {
int j;
long long removed = 0;
for (j = 0; j < server.dbnum; j++) {
removed += dictSize(server.db[j].dict);
dictEmpty(server.db[j].dict);
dictEmpty(server.db[j].expires);
}
return removed;
}
static int yesnotoi(char *s) {
if (!strcasecmp(s,"yes")) return 1;
else if (!strcasecmp(s,"no")) return 0;
else return -1;
}
/* I agree, this is a very rudimental way to load a configuration...
will improve later if the config gets more complex */
static void loadServerConfig(char *filename) {
FILE *fp;
char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
int linenum = 0;
sds line = NULL;
if (filename[0] == '-' && filename[1] == '\0')
fp = stdin;
else {
if ((fp = fopen(filename,"r")) == NULL) {
redisLog(REDIS_WARNING,"Fatal error, can't open config file");
exit(1);
}
}
while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
sds *argv;
int argc, j;
linenum++;
line = sdsnew(buf);
line = sdstrim(line," \t\r\n");
/* Skip comments and blank lines*/
if (line[0] == '#' || line[0] == '\0') {
sdsfree(line);
continue;
}
/* Split into arguments */
argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
sdstolower(argv[0]);
/* Execute config directives */
if (!strcasecmp(argv[0],"timeout") && argc == 2) {
server.maxidletime = atoi(argv[1]);
if (server.maxidletime < 0) {
err = "Invalid timeout value"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"port") && argc == 2) {
server.port = atoi(argv[1]);
if (server.port < 1 || server.port > 65535) {
err = "Invalid port"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"bind") && argc == 2) {
server.bindaddr = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"save") && argc == 3) {
int seconds = atoi(argv[1]);
int changes = atoi(argv[2]);
if (seconds < 1 || changes < 0) {
err = "Invalid save parameters"; goto loaderr;
}
appendServerSaveParams(seconds,changes);
} else if (!strcasecmp(argv[0],"dir") && argc == 2) {
if (chdir(argv[1]) == -1) {
redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
argv[1], strerror(errno));
exit(1);
}
} else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
else if (!strcasecmp(argv[1],"verbose")) server.verbosity = REDIS_VERBOSE;
else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
else {
err = "Invalid log level. Must be one of debug, notice, warning";
goto loaderr;
}
} else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
FILE *logfp;
server.logfile = zstrdup(argv[1]);
if (!strcasecmp(server.logfile,"stdout")) {
zfree(server.logfile);
server.logfile = NULL;
}
if (server.logfile) {
/* Test if we are able to open the file. The server will not
* be able to abort just for this problem later... */
logfp = fopen(server.logfile,"a");
if (logfp == NULL) {
err = sdscatprintf(sdsempty(),
"Can't open the log file: %s", strerror(errno));
goto loaderr;
}
fclose(logfp);
}
} else if (!strcasecmp(argv[0],"databases") && argc == 2) {
server.dbnum = atoi(argv[1]);
if (server.dbnum < 1) {
err = "Invalid number of databases"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
server.maxclients = atoi(argv[1]);
} else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
server.maxmemory = strtoll(argv[1], NULL, 10);
} else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
server.masterhost = sdsnew(argv[1]);
server.masterport = atoi(argv[2]);
server.replstate = REDIS_REPL_CONNECT;
} else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
server.masterauth = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
server.sharingpoolsize = atoi(argv[1]);
if (server.sharingpoolsize < 1) {
err = "invalid object sharing pool size"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
if ((server.daemonize = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
if ((server.appendonly = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
if (!strcasecmp(argv[1],"no")) {
server.appendfsync = APPENDFSYNC_NO;
} else if (!strcasecmp(argv[1],"always")) {
server.appendfsync = APPENDFSYNC_ALWAYS;
} else if (!strcasecmp(argv[1],"everysec")) {
server.appendfsync = APPENDFSYNC_EVERYSEC;
} else {
err = "argument must be 'no', 'always' or 'everysec'";
goto loaderr;
}
} else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
server.requirepass = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
server.pidfile = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
server.dbfilename = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"vm-swap-file") && argc == 2) {
zfree(server.vm_swap_file);
server.vm_swap_file = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"vm-max-memory") && argc == 2) {
server.vm_max_memory = strtoll(argv[1], NULL, 10);
} else if (!strcasecmp(argv[0],"vm-page-size") && argc == 2) {
server.vm_page_size = strtoll(argv[1], NULL, 10);
} else if (!strcasecmp(argv[0],"vm-pages") && argc == 2) {
server.vm_pages = strtoll(argv[1], NULL, 10);
} else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) {
server.vm_max_threads = strtoll(argv[1], NULL, 10);
} else {
err = "Bad directive or wrong number of arguments"; goto loaderr;
}
for (j = 0; j < argc; j++)
sdsfree(argv[j]);
zfree(argv);
sdsfree(line);
}
if (fp != stdin) fclose(fp);
return;
loaderr:
fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
fprintf(stderr, ">>> '%s'\n", line);
fprintf(stderr, "%s\n", err);
exit(1);
}
static void freeClientArgv(redisClient *c) {
int j;
for (j = 0; j < c->argc; j++)
decrRefCount(c->argv[j]);
for (j = 0; j < c->mbargc; j++)
decrRefCount(c->mbargv[j]);
c->argc = 0;
c->mbargc = 0;
}
static void freeClient(redisClient *c) {
listNode *ln;
/* Note that if the client we are freeing is blocked into a blocking
* call, we have to set querybuf to NULL *before* to call
* unblockClientWaitingData() to avoid processInputBuffer() will get
* called. Also it is important to remove the file events after
* this, because this call adds the READABLE event. */
sdsfree(c->querybuf);
c->querybuf = NULL;
if (c->flags & REDIS_BLOCKED)
unblockClientWaitingData(c);
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
listRelease(c->reply);
freeClientArgv(c);
close(c->fd);
/* Remove from the list of clients */
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
/* Remove from the list of clients waiting for VM operations */
if (server.vm_enabled && listLength(c->io_keys)) {
ln = listSearchKey(server.io_clients,c);
if (ln) listDelNode(server.io_clients,ln);
listRelease(c->io_keys);
}
listRelease(c->io_keys);
/* Other cleanup */
if (c->flags & REDIS_SLAVE) {
if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
close(c->repldbfd);
list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
ln = listSearchKey(l,c);
redisAssert(ln != NULL);
listDelNode(l,ln);
}
if (c->flags & REDIS_MASTER) {
server.master = NULL;
server.replstate = REDIS_REPL_CONNECT;
}
zfree(c->argv);
zfree(c->mbargv);
freeClientMultiState(c);
zfree(c);
}
#define GLUEREPLY_UP_TO (1024)
static void glueReplyBuffersIfNeeded(redisClient *c) {
int copylen = 0;
char buf[GLUEREPLY_UP_TO];
listNode *ln;
listIter li;
robj *o;
listRewind(c->reply,&li);
while((ln = listNext(&li))) {
int objlen;
o = ln->value;
objlen = sdslen(o->ptr);
if (copylen + objlen <= GLUEREPLY_UP_TO) {
memcpy(buf+copylen,o->ptr,objlen);
copylen += objlen;
listDelNode(c->reply,ln);
} else {
if (copylen == 0) return;
break;
}
}
/* Now the output buffer is empty, add the new single element */
o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
listAddNodeHead(c->reply,o);
}
static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = privdata;
int nwritten = 0, totwritten = 0, objlen;
robj *o;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
/* Use writev() if we have enough buffers to send */
if (!server.glueoutputbuf &&
listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
!(c->flags & REDIS_MASTER))
{
sendReplyToClientWritev(el, fd, privdata, mask);
return;
}
while(listLength(c->reply)) {
if (server.glueoutputbuf && listLength(c->reply) > 1)
glueReplyBuffersIfNeeded(c);
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o->ptr);
if (objlen == 0) {
listDelNode(c->reply,listFirst(c->reply));
continue;
}
if (c->flags & REDIS_MASTER) {
/* Don't reply to a master */
nwritten = objlen - c->sentlen;
} else {
nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
}
c->sentlen += nwritten;
totwritten += nwritten;
/* If we fully sent the object on head go to the next one */
if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
}
/* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world
* scenario think about 'KEYS *' against the loopback interfae) */
if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
}
if (nwritten == -1) {
if (errno == EAGAIN) {
nwritten = 0;
} else {
redisLog(REDIS_VERBOSE,
"Error writing to client: %s", strerror(errno));
freeClient(c);
return;
}
}
if (totwritten > 0) c->lastinteraction = time(NULL);
if (listLength(c->reply) == 0) {
c->sentlen = 0;
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
}
}
static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
{
redisClient *c = privdata;
int nwritten = 0, totwritten = 0, objlen, willwrite;
robj *o;
struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
int offset, ion = 0;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
listNode *node;
while (listLength(c->reply)) {
offset = c->sentlen;
ion = 0;
willwrite = 0;
/* fill-in the iov[] array */
for(node = listFirst(c->reply); node; node = listNextNode(node)) {
o = listNodeValue(node);
objlen = sdslen(o->ptr);
if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
break;
if(ion == REDIS_WRITEV_IOVEC_COUNT)
break; /* no more iovecs */
iov[ion].iov_base = ((char*)o->ptr) + offset;
iov[ion].iov_len = objlen - offset;
willwrite += objlen - offset;
offset = 0; /* just for the first item */
ion++;
}
if(willwrite == 0)
break;
/* write all collected blocks at once */
if((nwritten = writev(fd, iov, ion)) < 0) {
if (errno != EAGAIN) {
redisLog(REDIS_VERBOSE,
"Error writing to client: %s", strerror(errno));
freeClient(c);
return;
}
break;
}
totwritten += nwritten;
offset = c->sentlen;
/* remove written robjs from c->reply */
while (nwritten && listLength(c->reply)) {
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o->ptr);
if(nwritten >= objlen - offset) {
listDelNode(c->reply, listFirst(c->reply));
nwritten -= objlen - offset;
c->sentlen = 0;
} else {
/* partial write */
c->sentlen += nwritten;
break;
}
offset = 0;
}
}
if (totwritten > 0)
c->lastinteraction = time(NULL);
if (listLength(c->reply) == 0) {
c->sentlen = 0;
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
}
}
static struct redisCommand *lookupCommand(char *name) {
int j = 0;
while(cmdTable[j].name != NULL) {
if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
j++;
}
return NULL;
}
/* resetClient prepare the client to process the next command */
static void resetClient(redisClient *c) {
freeClientArgv(c);
c->bulklen = -1;
c->multibulk = 0;
}
/* Call() is the core of Redis execution of a command */
static void call(redisClient *c, struct redisCommand *cmd) {
long long dirty;
dirty = server.dirty;
cmd->proc(c);
if (server.appendonly && server.dirty-dirty)
feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
if (server.dirty-dirty && listLength(server.slaves))
replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
if (listLength(server.monitors))
replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
server.stat_numcommands++;
}
/* If this function gets called we already read a whole
* command, argments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* If 1 is returned the client is still alive and valid and
* and other operations can be performed by the caller. Otherwise
* if 0 is returned the client was destroied (i.e. after QUIT). */
static int processCommand(redisClient *c) {
struct redisCommand *cmd;
/* Free some memory if needed (maxmemory setting) */
if (server.maxmemory) freeMemoryIfNeeded();
/* Handle the multi bulk command type. This is an alternative protocol
* supported by Redis in order to receive commands that are composed of
* multiple binary-safe "bulk" arguments. The latency of processing is
* a bit higher but this allows things like multi-sets, so if this
* protocol is used only for MSET and similar commands this is a big win. */
if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
if (c->multibulk <= 0) {
resetClient(c);
return 1;
} else {
decrRefCount(c->argv[c->argc-1]);
c->argc--;
return 1;
}
} else if (c->multibulk) {
if (c->bulklen == -1) {
if (((char*)c->argv[0]->ptr)[0] != '$') {
addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
resetClient(c);
return 1;
} else {
int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
decrRefCount(c->argv[0]);
if (bulklen < 0 || bulklen > 1024*1024*1024) {
c->argc--;
addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
resetClient(c);
return 1;
}
c->argc--;
c->bulklen = bulklen+2; /* add two bytes for CR+LF */
return 1;
}
} else {
c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
c->mbargv[c->mbargc] = c->argv[0];
c->mbargc++;
c->argc--;
c->multibulk--;
if (c->multibulk == 0) {
robj **auxargv;
int auxargc;
/* Here we need to swap the multi-bulk argc/argv with the
* normal argc/argv of the client structure. */
auxargv = c->argv;
c->argv = c->mbargv;
c->mbargv = auxargv;
auxargc = c->argc;
c->argc = c->mbargc;
c->mbargc = auxargc;
/* We need to set bulklen to something different than -1
* in order for the code below to process the command without
* to try to read the last argument of a bulk command as
* a special argument. */
c->bulklen = 0;
/* continue below and process the command */
} else {
c->bulklen = -1;
return 1;
}
}
}
/* -- end of multi bulk commands processing -- */
/* The QUIT command is handled as a special case. Normal command
* procs are unable to close the client connection safely */
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
freeClient(c);
return 0;
}
cmd = lookupCommand(c->argv[0]->ptr);
if (!cmd) {
addReplySds(c,
sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
(char*)c->argv[0]->ptr));
resetClient(c);
return 1;
} else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
(c->argc < -cmd->arity)) {
addReplySds(c,
sdscatprintf(sdsempty(),
"-ERR wrong number of arguments for '%s' command\r\n",
cmd->name));
resetClient(c);
return 1;
} else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
resetClient(c);
return 1;
} else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
int bulklen = atoi(c->argv[c->argc-1]->ptr);
decrRefCount(c->argv[c->argc-1]);
if (bulklen < 0 || bulklen > 1024*1024*1024) {
c->argc--;
addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
resetClient(c);
return 1;
}
c->argc--;
c->bulklen = bulklen+2; /* add two bytes for CR+LF */
/* It is possible that the bulk read is already in the
* buffer. Check this condition and handle it accordingly.
* This is just a fast path, alternative to call processInputBuffer().
* It's a good idea since the code is small and this condition
* happens most of the times. */
if ((signed)sdslen(c->querybuf) >= c->bulklen) {
c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
c->argc++;
c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
} else {
return 1;
}
}
/* Let's try to share objects on the command arguments vector */
if (server.shareobjects) {
int j;
for(j = 1; j < c->argc; j++)
c->argv[j] = tryObjectSharing(c->argv[j]);
}
/* Let's try to encode the bulk object to save space. */
if (cmd->flags & REDIS_CMD_BULK)
tryObjectEncoding(c->argv[c->argc-1]);
/* Check if the user is authenticated */
if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
resetClient(c);
return 1;
}
/* Exec the command */
if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
queueMultiCommand(c,cmd);
addReply(c,shared.queued);
} else {
call(c,cmd);
}
/* Prepare the client for the next command */
if (c->flags & REDIS_CLOSE) {
freeClient(c);
return 0;
}
resetClient(c);
return 1;
}
static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int outc = 0, j;
robj **outv;
/* (args*2)+1 is enough room for args, spaces, newlines */
robj *static_outv[REDIS_STATIC_ARGS*2+1];
if (argc <= REDIS_STATIC_ARGS) {
outv = static_outv;
} else {
outv = zmalloc(sizeof(robj*)*(argc*2+1));
}
for (j = 0; j < argc; j++) {
if (j != 0) outv[outc++] = shared.space;
if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
robj *lenobj;
lenobj = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),"%lu\r\n",
(unsigned long) stringObjectLen(argv[j])));
lenobj->refcount = 0;
outv[outc++] = lenobj;
}
outv[outc++] = argv[j];
}
outv[outc++] = shared.crlf;
/* Increment all the refcounts at start and decrement at end in order to
* be sure to free objects if there is no slave in a replication state
* able to be feed with commands */
for (j = 0; j < outc; j++) incrRefCount(outv[j]);
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
/* Feed all the other slaves, MONITORs and so on */
if (slave->slaveseldb != dictid) {
robj *selectcmd;
switch(dictid) {
case 0: selectcmd = shared.select0; break;
case 1: selectcmd = shared.select1; break;
case 2: selectcmd = shared.select2; break;
case 3: selectcmd = shared.select3; break;
case 4: selectcmd = shared.select4; break;
case 5: selectcmd = shared.select5; break;
case 6: selectcmd = shared.select6; break;
case 7: selectcmd = shared.select7; break;
case 8: selectcmd = shared.select8; break;
case 9: selectcmd = shared.select9; break;
default:
selectcmd = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),"select %d\r\n",dictid));
selectcmd->refcount = 0;
break;
}
addReply(slave,selectcmd);
slave->slaveseldb = dictid;
}
for (j = 0; j < outc; j++) addReply(slave,outv[j]);
}
for (j = 0; j < outc; j++) decrRefCount(outv[j]);
if (outv != static_outv) zfree(outv);
}
static void processInputBuffer(redisClient *c) {
again:
/* Before to process the input buffer, make sure the client is not
* waitig for a blocking operation such as BLPOP. Note that the first
* iteration the client is never blocked, otherwise the processInputBuffer
* would not be called at all, but after the execution of the first commands
* in the input buffer the client may be blocked, and the "goto again"
* will try to reiterate. The following line will make it return asap. */
if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
if (c->bulklen == -1) {
/* Read the first line of the query */
char *p = strchr(c->querybuf,'\n');
size_t querylen;
if (p) {
sds query, *argv;
int argc, j;
query = c->querybuf;
c->querybuf = sdsempty();
querylen = 1+(p-(query));
if (sdslen(query) > querylen) {
/* leave data after the first line of the query in the buffer */
c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
}
*p = '\0'; /* remove "\n" */
if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
sdsupdatelen(query);
/* Now we can split the query in arguments */
argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
sdsfree(query);
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*argc);
for (j = 0; j < argc; j++) {
if (sdslen(argv[j])) {
c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
c->argc++;
} else {
sdsfree(argv[j]);
}
}
zfree(argv);
if (c->argc) {
/* Execute the command. If the client is still valid
* after processCommand() return and there is something
* on the query buffer try to process the next command. */
if (processCommand(c) && sdslen(c->querybuf)) goto again;
} else {
/* Nothing to process, argc == 0. Just process the query
* buffer if it's not empty or return to the caller */
if (sdslen(c->querybuf)) goto again;
}
return;
} else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
redisLog(REDIS_VERBOSE, "Client protocol error");
freeClient(c);
return;
}
} else {
/* Bulk read handling. Note that if we are at this point
the client already sent a command terminated with a newline,
we are reading the bulk data that is actually the last
argument of the command. */
int qbl = sdslen(c->querybuf);
if (c->bulklen <= qbl) {
/* Copy everything but the final CRLF as final argument */
c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
c->argc++;
c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
/* Process the command. If the client is still valid after
* the processing and there is more data in the buffer
* try to parse it. */
if (processCommand(c) && sdslen(c->querybuf)) goto again;
return;
}
}
}
static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
char buf[REDIS_IOBUF_LEN];
int nread;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
nread = read(fd, buf, REDIS_IOBUF_LEN);
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
} else {
redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
redisLog(REDIS_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
if (nread) {
c->querybuf = sdscatlen(c->querybuf, buf, nread);
c->lastinteraction = time(NULL);
} else {
return;
}
processInputBuffer(c);
}
static int selectDb(redisClient *c, int id) {
if (id < 0 || id >= server.dbnum)
return REDIS_ERR;
c->db = &server.db[id];
return REDIS_OK;
}
static void *dupClientReplyValue(void *o) {
incrRefCount((robj*)o);
return 0;
}
static redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(*c));
anetNonBlock(NULL,fd);
anetTcpNoDelay(NULL,fd);
if (!c) return NULL;
selectDb(c,0);
c->fd = fd;
c->querybuf = sdsempty();
c->argc = 0;
c->argv = NULL;
c->bulklen = -1;
c->multibulk = 0;
c->mbargc = 0;
c->mbargv = NULL;
c->sentlen = 0;
c->flags = 0;
c->lastinteraction = time(NULL);
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
c->reply = listCreate();
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
c->blockingkeys = NULL;
c->blockingkeysnum = 0;
c->io_keys = listCreate();
listSetFreeMethod(c->io_keys,decrRefCount);
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
readQueryFromClient, c) == AE_ERR) {
freeClient(c);
return NULL;
}
listAddNodeTail(server.clients,c);
initClientMultiState(c);
return c;
}
static void addReply(redisClient *c, robj *obj) {
if (listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR) return;
if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
obj = dupStringObject(obj);
obj->refcount = 0; /* getDecodedObject() will increment the refcount */
}
listAddNodeTail(c->reply,getDecodedObject(obj));
}
static void addReplySds(redisClient *c, sds s) {
robj *o = createObject(REDIS_STRING,s);
addReply(c,o);
decrRefCount(o);
}
static void addReplyDouble(redisClient *c, double d) {
char buf[128];
snprintf(buf,sizeof(buf),"%.17g",d);
addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
(unsigned long) strlen(buf),buf));
}
static void addReplyBulkLen(redisClient *c, robj *obj) {
size_t len;
if (obj->encoding == REDIS_ENCODING_RAW) {
len = sdslen(obj->ptr);
} else {
long n = (long)obj->ptr;
/* Compute how many bytes will take this integer as a radix 10 string */
len = 1;
if (n < 0) {
len++;
n = -n;
}
while((n = n/10) != 0) {
len++;
}
}
addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
}
static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
char cip[128];
redisClient *c;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
cfd = anetAccept(server.neterr, fd, cip, &cport);
if (cfd == AE_ERR) {
redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
if ((c = createClient(cfd)) == NULL) {
redisLog(REDIS_WARNING,"Error allocating resoures for the client");
close(cfd); /* May be already closed, just ingore errors */
return;
}
/* If maxclient directive is set and this is one client more... close the
* connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in nonblocking
* mode and we can send an error for free using the Kernel I/O */
if (server.maxclients && listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
freeClient(c);
return;
}
server.stat_numconnections++;
}
/* ======================= Redis objects implementation ===================== */
static robj *createObject(int type, void *ptr) {
robj *o;
if (server.vm_enabled) pthread_mutex_lock(&server.obj_freelist_mutex);
if (listLength(server.objfreelist)) {
listNode *head = listFirst(server.objfreelist);
o = listNodeValue(head);
listDelNode(server.objfreelist,head);
if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex);
} else {
if (server.vm_enabled) {
pthread_mutex_unlock(&server.obj_freelist_mutex);
o = zmalloc(sizeof(*o));
} else {
o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM));
}
}
o->type = type;
o->encoding = REDIS_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;
if (server.vm_enabled) {
/* Note that this code may run in the context of an I/O thread
* and accessing to server.unixtime in theory is an error
* (no locks). But in practice this is safe, and even if we read
* garbage Redis will not fail, as it's just a statistical info */
o->vm.atime = server.unixtime;
o->storage = REDIS_VM_MEMORY;
}
return o;
}
static robj *createStringObject(char *ptr, size_t len) {
return createObject(REDIS_STRING,sdsnewlen(ptr,len));
}
static robj *dupStringObject(robj *o) {
assert(o->encoding == REDIS_ENCODING_RAW);
return createStringObject(o->ptr,sdslen(o->ptr));
}
static robj *createListObject(void) {
list *l = listCreate();
listSetFreeMethod(l,decrRefCount);
return createObject(REDIS_LIST,l);
}
static robj *createSetObject(void) {
dict *d = dictCreate(&setDictType,NULL);
return createObject(REDIS_SET,d);
}
static robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();
return createObject(REDIS_ZSET,zs);
}
static void freeStringObject(robj *o) {
if (o->encoding == REDIS_ENCODING_RAW) {
sdsfree(o->ptr);
}
}
static void freeListObject(robj *o) {
listRelease((list*) o->ptr);
}
static void freeSetObject(robj *o) {
dictRelease((dict*) o->ptr);
}
static void freeZsetObject(robj *o) {
zset *zs = o->ptr;
dictRelease(zs->dict);
zslFree(zs->zsl);
zfree(zs);
}
static void freeHashObject(robj *o) {
dictRelease((dict*) o->ptr);
}
static void incrRefCount(robj *o) {
redisAssert(!server.vm_enabled || o->storage == REDIS_VM_MEMORY);
o->refcount++;
}
static void decrRefCount(void *obj) {
robj *o = obj;
/* Object is a key of a swapped out value, or in the process of being
* loaded. */
if (server.vm_enabled &&
(o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING))
{
if (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING) {
redisAssert(o->refcount == 1);
}
if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(obj);
redisAssert(o->type == REDIS_STRING);
freeStringObject(o);
vmMarkPagesFree(o->vm.page,o->vm.usedpages);
pthread_mutex_lock(&server.obj_freelist_mutex);
if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
!listAddNodeHead(server.objfreelist,o))
zfree(o);
pthread_mutex_unlock(&server.obj_freelist_mutex);
server.vm_stats_swapped_objects--;
return;
}
/* Object is in memory, or in the process of being swapped out. */
if (--(o->refcount) == 0) {
if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING)
vmCancelThreadedIOJob(obj);
switch(o->type) {
case REDIS_STRING: freeStringObject(o); break;
case REDIS_LIST: freeListObject(o); break;
case REDIS_SET: freeSetObject(o); break;
case REDIS_ZSET: freeZsetObject(o); break;
case REDIS_HASH: freeHashObject(o); break;
default: redisAssert(0 != 0); break;
}
if (server.vm_enabled) pthread_mutex_lock(&server.obj_freelist_mutex);
if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
!listAddNodeHead(server.objfreelist,o))
zfree(o);
if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex);
}
}
static robj *lookupKey(redisDb *db, robj *key) {
dictEntry *de = dictFind(db->dict,key);
if (de) {
robj *key = dictGetEntryKey(de);
robj *val = dictGetEntryVal(de);
if (server.vm_enabled) {
if (key->storage == REDIS_VM_MEMORY ||
key->storage == REDIS_VM_SWAPPING)
{
/* If we were swapping the object out, stop it, this key
* was requested. */
if (key->storage == REDIS_VM_SWAPPING)
vmCancelThreadedIOJob(key);
/* Update the access time of the key for the aging algorithm. */
key->vm.atime = server.unixtime;
} else {
/* Our value was swapped on disk. Bring it at home. */
redisAssert(val == NULL);
val = vmLoadObject(key);
dictGetEntryVal(de) = val;
}
}
return val;
} else {
return NULL;
}
}
static robj *lookupKeyRead(redisDb *db, robj *key) {
expireIfNeeded(db,key);
return lookupKey(db,key);
}
static robj *lookupKeyWrite(redisDb *db, robj *key) {
deleteIfVolatile(db,key);
return lookupKey(db,key);
}
static int deleteKey(redisDb *db, robj *key) {
int retval;
/* We need to protect key from destruction: after the first dictDelete()
* it may happen that 'key' is no longer valid if we don't increment
* it's count. This may happen when we get the object reference directly
* from the hash table with dictRandomKey() or dict iterators */
incrRefCount(key);
if (dictSize(db->expires)) dictDelete(db->expires,key);
retval = dictDelete(db->dict,key);
decrRefCount(key);
return retval == DICT_OK;
}
/* Try to share an object against the shared objects pool */
static robj *tryObjectSharing(robj *o) {
struct dictEntry *de;
unsigned long c;
if (o == NULL || server.shareobjects == 0) return o;
redisAssert(o->type == REDIS_STRING);
de = dictFind(server.sharingpool,o);
if (de) {
robj *shared = dictGetEntryKey(de);
c = ((unsigned long) dictGetEntryVal(de))+1;
dictGetEntryVal(de) = (void*) c;
incrRefCount(shared);
decrRefCount(o);
return shared;
} else {
/* Here we are using a stream algorihtm: Every time an object is
* shared we increment its count, everytime there is a miss we
* recrement the counter of a random object. If this object reaches
* zero we remove the object and put the current object instead. */
if (dictSize(server.sharingpool) >=
server.sharingpoolsize) {
de = dictGetRandomKey(server.sharingpool);
redisAssert(de != NULL);
c = ((unsigned long) dictGetEntryVal(de))-1;
dictGetEntryVal(de) = (void*) c;
if (c == 0) {
dictDelete(server.sharingpool,de->key);
}
} else {
c = 0; /* If the pool is empty we want to add this object */
}
if (c == 0) {
int retval;
retval = dictAdd(server.sharingpool,o,(void*)1);
redisAssert(retval == DICT_OK);
incrRefCount(o);
}
return o;
}
}
/* Check if the nul-terminated string 's' can be represented by a long
* (that is, is a number that fits into long without any other space or
* character before or after the digits).
*
* If so, the function returns REDIS_OK and *longval is set to the value
* of the number. Otherwise REDIS_ERR is returned */
static int isStringRepresentableAsLong(sds s, long *longval) {
char buf[32], *endptr;
long value;
int slen;
value = strtol(s, &endptr, 10);
if (endptr[0] != '\0') return REDIS_ERR;
slen = snprintf(buf,32,"%ld",value);
/* If the number converted back into a string is not identical
* then it's not possible to encode the string as integer */
if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
if (longval) *longval = value;
return REDIS_OK;
}
/* Try to encode a string object in order to save space */
static int tryObjectEncoding(robj *o) {
long value;
sds s = o->ptr;
if (o->encoding != REDIS_ENCODING_RAW)
return REDIS_ERR; /* Already encoded */
/* It's not save to encode shared objects: shared objects can be shared
* everywhere in the "object space" of Redis. Encoded objects can only
* appear as "values" (and not, for instance, as keys) */
if (o->refcount > 1) return REDIS_ERR;
/* Currently we try to encode only strings */
redisAssert(o->type == REDIS_STRING);
/* Check if we can represent this string as a long integer */
if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
/* Ok, this object can be encoded */
o->encoding = REDIS_ENCODING_INT;
sdsfree(o->ptr);
o->ptr = (void*) value;
return REDIS_OK;
}
/* Get a decoded version of an encoded object (returned as a new object).
* If the object is already raw-encoded just increment the ref count. */
static robj *getDecodedObject(robj *o) {
robj *dec;
if (o->encoding == REDIS_ENCODING_RAW) {
incrRefCount(o);
return o;
}
if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
char buf[32];
snprintf(buf,32,"%ld",(long)o->ptr);
dec = createStringObject(buf,strlen(buf));
return dec;
} else {
redisAssert(1 != 1);
}
}
/* Compare two string objects via strcmp() or alike.
* Note that the objects may be integer-encoded. In such a case we
* use snprintf() to get a string representation of the numbers on the stack
* and compare the strings, it's much faster than calling getDecodedObject().
*
* Important note: if objects are not integer encoded, but binary-safe strings,
* sdscmp() from sds.c will apply memcmp() so this function ca be considered
* binary safe. */
static int compareStringObjects(robj *a, robj *b) {
redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
char bufa[128], bufb[128], *astr, *bstr;
int bothsds = 1;
if (a == b) return 0;
if (a->encoding != REDIS_ENCODING_RAW) {
snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
astr = bufa;
bothsds = 0;
} else {
astr = a->ptr;
}
if (b->encoding != REDIS_ENCODING_RAW) {
snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
bstr = bufb;
bothsds = 0;
} else {
bstr = b->ptr;
}
return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
}
static size_t stringObjectLen(robj *o) {
redisAssert(o->type == REDIS_STRING);
if (o->encoding == REDIS_ENCODING_RAW) {
return sdslen(o->ptr);
} else {
char buf[32];
return snprintf(buf,32,"%ld",(long)o->ptr);
}
}
/*============================ RDB saving/loading =========================== */
static int rdbSaveType(FILE *fp, unsigned char type) {
if (fwrite(&type,1,1,fp) == 0) return -1;
return 0;
}
static int rdbSaveTime(FILE *fp, time_t t) {
int32_t t32 = (int32_t) t;
if (fwrite(&t32,4,1,fp) == 0) return -1;
return 0;
}
/* check rdbLoadLen() comments for more info */
static int rdbSaveLen(FILE *fp, uint32_t len) {
unsigned char buf[2];
if (len < (1<<6)) {
/* Save a 6 bit len */
buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
if (fwrite(buf,1,1,fp) == 0) return -1;
} else if (len < (1<<14)) {
/* Save a 14 bit len */
buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
buf[1] = len&0xFF;
if (fwrite(buf,2,1,fp) == 0) return -1;
} else {
/* Save a 32 bit len */
buf[0] = (REDIS_RDB_32BITLEN<<6);
if (fwrite(buf,1,1,fp) == 0) return -1;
len = htonl(len);
if (fwrite(&len,4,1,fp) == 0) return -1;
}
return 0;
}
/* String objects in the form "2391" "-100" without any space and with a
* range of values that can fit in an 8, 16 or 32 bit signed value can be
* encoded as integers to save space */
static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
long long value;
char *endptr, buf[32];
/* Check if it's possible to encode this value as a number */
value = strtoll(s, &endptr, 10);
if (endptr[0] != '\0') return 0;
snprintf(buf,32,"%lld",value);
/* If the number converted back into a string is not identical
* then it's not possible to encode the string as integer */
if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
/* Finally check if it fits in our ranges */
if (value >= -(1<<7) && value <= (1<<7)-1) {
enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
enc[1] = value&0xFF;
return 2;
} else if (value >= -(1<<15) && value <= (1<<15)-1) {
enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
enc[1] = value&0xFF;
enc[2] = (value>>8)&0xFF;
return 3;
} else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
enc[1] = value&0xFF;
enc[2] = (value>>8)&0xFF;
enc[3] = (value>>16)&0xFF;
enc[4] = (value>>24)&0xFF;
return 5;
} else {
return 0;
}
}
static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
unsigned int comprlen, outlen;
unsigned char byte;
void *out;
/* We require at least four bytes compression for this to be worth it */
outlen = sdslen(obj->ptr)-4;
if (outlen <= 0) return 0;
if ((out = zmalloc(outlen+1)) == NULL) return 0;
comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
if (comprlen == 0) {
zfree(out);
return 0;
}
/* Data compressed! Let's save it on disk */
byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
zfree(out);
return comprlen;
writeerr:
zfree(out);
return -1;
}
/* Save a string objet as [len][data] on disk. If the object is a string
* representation of an integer value we try to safe it in a special form */
static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
size_t len;
int enclen;
len = sdslen(obj->ptr);
/* Try integer encoding */
if (len <= 11) {
unsigned char buf[5];
if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
if (fwrite(buf,enclen,1,fp) == 0) return -1;
return 0;
}
}
/* Try LZF compression - under 20 bytes it's unable to compress even
* aaaaaaaaaaaaaaaaaa so skip it */
if (server.rdbcompression && len > 20) {
int retval;
retval = rdbSaveLzfStringObject(fp,obj);
if (retval == -1) return -1;
if (retval > 0) return 0;
/* retval == 0 means data can't be compressed, save the old way */
}
/* Store verbatim */
if (rdbSaveLen(fp,len) == -1) return -1;
if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
return 0;
}
/* Like rdbSaveStringObjectRaw() but handle encoded objects */
static int rdbSaveStringObject(FILE *fp, robj *obj) {
int retval;
/* Avoid incr/decr ref count business when possible.
* This plays well with copy-on-write given that we are probably
* in a child process (BGSAVE). Also this makes sure key objects
* of swapped objects are not incRefCount-ed (an assert does not allow
* this in order to avoid bugs) */
if (obj->encoding != REDIS_ENCODING_RAW) {
obj = getDecodedObject(obj);
retval = rdbSaveStringObjectRaw(fp,obj);
decrRefCount(obj);
} else {
retval = rdbSaveStringObjectRaw(fp,obj);
}
return retval;
}
/* Save a double value. Doubles are saved as strings prefixed by an unsigned
* 8 bit integer specifing the length of the representation.
* This 8 bit integer has special values in order to specify the following
* conditions:
* 253: not a number
* 254: + inf
* 255: - inf
*/
static int rdbSaveDoubleValue(FILE *fp, double val) {
unsigned char buf[128];
int len;
if (isnan(val)) {
buf[0] = 253;
len = 1;
} else if (!isfinite(val)) {
len = 1;
buf[0] = (val < 0) ? 255 : 254;
} else {
snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
buf[0] = strlen((char*)buf+1);
len = buf[0]+1;
}
if (fwrite(buf,len,1,fp) == 0) return -1;
return 0;
}
/* Save a Redis object. */
static int rdbSaveObject(FILE *fp, robj *o) {
if (o->type == REDIS_STRING) {
/* Save a string value */
if (rdbSaveStringObject(fp,o) == -1) return -1;
} else if (o->type == REDIS_LIST) {
/* Save a list value */
list *list = o->ptr;
listIter li;
listNode *ln;
if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
listRewind(list,&li);
while((ln = listNext(&li))) {
robj *eleobj = listNodeValue(ln);
if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
}
} else if (o->type == REDIS_SET) {
/* Save a set value */
dict *set = o->ptr;
dictIterator *di = dictGetIterator(set);
dictEntry *de;
if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
while((de = dictNext(di)) != NULL) {
robj *eleobj = dictGetEntryKey(de);
if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
}
dictReleaseIterator(di);
} else if (o->type == REDIS_ZSET) {
/* Save a set value */
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;
if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) return -1;
while((de = dictNext(di)) != NULL) {
robj *eleobj = dictGetEntryKey(de);
double *score = dictGetEntryVal(de);
if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
if (rdbSaveDoubleValue(fp,*score) == -1) return -1;
}
dictReleaseIterator(di);
} else {
redisAssert(0 != 0);
}
return 0;
}
/* Return the length the object will have on disk if saved with
* the rdbSaveObject() function. Currently we use a trick to get
* this length with very little changes to the code. In the future
* we could switch to a faster solution. */
static off_t rdbSavedObjectLen(robj *o, FILE *fp) {
if (fp == NULL) fp = server.devnull;
rewind(fp);
assert(rdbSaveObject(fp,o) != 1);
return ftello(fp);
}
/* Return the number of pages required to save this object in the swap file */
static off_t rdbSavedObjectPages(robj *o, FILE *fp) {
off_t bytes = rdbSavedObjectLen(o,fp);
return (bytes+(server.vm_page_size-1))/server.vm_page_size;
}
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
static int rdbSave(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
FILE *fp;
char tmpfile[256];
int j;
time_t now = time(NULL);
/* Wait for I/O therads to terminate, just in case this is a
* foreground-saving, to avoid seeking the swap file descriptor at the
* same time. */
if (server.vm_enabled)
waitEmptyIOJobsQueue();
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
return REDIS_ERR;
}
if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
}
/* Write the SELECT DB opcode */
if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
if (rdbSaveLen(fp,j) == -1) goto werr;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
robj *key = dictGetEntryKey(de);
robj *o = dictGetEntryVal(de);
time_t expiretime = getExpire(db,key);
/* Save the expire time */
if (expiretime != -1) {
/* If this key is already expired skip it */
if (expiretime < now) continue;
if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
if (rdbSaveTime(fp,expiretime) == -1) goto werr;
}
/* Save the key and associated value. This requires special
* handling if the value is swapped out. */
if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY ||
key->storage == REDIS_VM_SWAPPING) {
/* Save type, key, value */
if (rdbSaveType(fp,o->type) == -1) goto werr;
if (rdbSaveStringObject(fp,key) == -1) goto werr;
if (rdbSaveObject(fp,o) == -1) goto werr;
} else {
/* REDIS_VM_SWAPPED or REDIS_VM_LOADING */
robj *po;
/* Get a preview of the object in memory */
po = vmPreviewObject(key);
/* Save type, key, value */
if (rdbSaveType(fp,key->vtype) == -1) goto werr;
if (rdbSaveStringObject(fp,key) == -1) goto werr;
if (rdbSaveObject(fp,po) == -1) goto werr;
/* Remove the loaded object from memory */
decrRefCount(po);
}
}
dictReleaseIterator(di);
}
/* EOF opcode */
if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
/* Make sure data will not remain on the OS's output buffers */
fflush(fp);
fsync(fileno(fp));
fclose(fp);
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"DB saved on disk");
server.dirty = 0;
server.lastsave = time(NULL);
return REDIS_OK;
werr:
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
static int rdbSaveBackground(char *filename) {
pid_t childpid;
if (server.bgsavechildpid != -1) return REDIS_ERR;
if (server.vm_enabled) waitEmptyIOJobsQueue();
if ((childpid = fork()) == 0) {
/* Child */
if (server.vm_enabled) vmReopenSwapFile();
close(server.fd);
if (rdbSave(filename) == REDIS_OK) {
exit(0);
} else {
exit(1);
}
} else {
/* Parent */
if (childpid == -1) {
redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
server.bgsavechildpid = childpid;
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
static void rdbRemoveTempFile(pid_t childpid) {
char tmpfile[256];
snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
unlink(tmpfile);
}
static int rdbLoadType(FILE *fp) {
unsigned char type;
if (fread(&type,1,1,fp) == 0) return -1;
return type;
}
static time_t rdbLoadTime(FILE *fp) {
int32_t t32;
if (fread(&t32,4,1,fp) == 0) return -1;
return (time_t) t32;
}
/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
* of this file for a description of how this are stored on disk.
*
* isencoded is set to 1 if the readed length is not actually a length but
* an "encoding type", check the above comments for more info */
static uint32_t rdbLoadLen(FILE *fp, int *isencoded) {
unsigned char buf[2];
uint32_t len;
int type;
if (isencoded) *isencoded = 0;
if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
type = (buf[0]&0xC0)>>6;
if (type == REDIS_RDB_6BITLEN) {
/* Read a 6 bit len */
return buf[0]&0x3F;
} else if (type == REDIS_RDB_ENCVAL) {
/* Read a 6 bit len encoding type */
if (isencoded) *isencoded = 1;
return buf[0]&0x3F;
} else if (type == REDIS_RDB_14BITLEN) {
/* Read a 14 bit len */
if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
return ((buf[0]&0x3F)<<8)|buf[1];
} else {
/* Read a 32 bit len */
if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
return ntohl(len);
}
}
static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
unsigned char enc[4];
long long val;
if (enctype == REDIS_RDB_ENC_INT8) {
if (fread(enc,1,1,fp) == 0) return NULL;
val = (signed char)enc[0];
} else if (enctype == REDIS_RDB_ENC_INT16) {
uint16_t v;
if (fread(enc,2,1,fp) == 0) return NULL;
v = enc[0]|(enc[1]<<8);
val = (int16_t)v;
} else if (enctype == REDIS_RDB_ENC_INT32) {
uint32_t v;
if (fread(enc,4,1,fp) == 0) return NULL;
v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
val = (int32_t)v;
} else {
val = 0; /* anti-warning */
redisAssert(0!=0);
}
return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
}
static robj *rdbLoadLzfStringObject(FILE*fp) {
unsigned int len, clen;
unsigned char *c = NULL;
sds val = NULL;
if ((clen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
if ((c = zmalloc(clen)) == NULL) goto err;
if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
if (fread(c,clen,1,fp) == 0) goto err;
if (lzf_decompress(c,clen,val,len) == 0) goto err;
zfree(c);
return createObject(REDIS_STRING,val);
err:
zfree(c);
sdsfree(val);
return NULL;
}
static robj *rdbLoadStringObject(FILE*fp) {
int isencoded;
uint32_t len;
sds val;
len = rdbLoadLen(fp,&isencoded);
if (isencoded) {
switch(len) {
case REDIS_RDB_ENC_INT8:
case REDIS_RDB_ENC_INT16:
case REDIS_RDB_ENC_INT32:
return tryObjectSharing(rdbLoadIntegerObject(fp,len));
case REDIS_RDB_ENC_LZF:
return tryObjectSharing(rdbLoadLzfStringObject(fp));
default:
redisAssert(0!=0);
}
}
if (len == REDIS_RDB_LENERR) return NULL;
val = sdsnewlen(NULL,len);
if (len && fread(val,len,1,fp) == 0) {
sdsfree(val);
return NULL;
}
return tryObjectSharing(createObject(REDIS_STRING,val));
}
/* For information about double serialization check rdbSaveDoubleValue() */
static int rdbLoadDoubleValue(FILE *fp, double *val) {
char buf[128];
unsigned char len;
if (fread(&len,1,1,fp) == 0) return -1;
switch(len) {
case 255: *val = R_NegInf; return 0;
case 254: *val = R_PosInf; return 0;
case 253: *val = R_Nan; return 0;
default:
if (fread(buf,len,1,fp) == 0) return -1;
buf[len] = '\0';
sscanf(buf, "%lg", val);
return 0;
}
}
/* Load a Redis object of the specified type from the specified file.
* On success a newly allocated object is returned, otherwise NULL. */
static robj *rdbLoadObject(int type, FILE *fp) {
robj *o;
if (type == REDIS_STRING) {
/* Read string value */
if ((o = rdbLoadStringObject(fp)) == NULL) return NULL;
tryObjectEncoding(o);
} else if (type == REDIS_LIST || type == REDIS_SET) {
/* Read list/set value */
uint32_t listlen;
if ((listlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
o = (type == REDIS_LIST) ? createListObject() : createSetObject();
/* Load every single element of the list/set */
while(listlen--) {
robj *ele;
if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
tryObjectEncoding(ele);
if (type == REDIS_LIST) {
listAddNodeTail((list*)o->ptr,ele);
} else {
dictAdd((dict*)o->ptr,ele,NULL);
}
}
} else if (type == REDIS_ZSET) {
/* Read list/set value */
uint32_t zsetlen;
zset *zs;
if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
o = createZsetObject();
zs = o->ptr;
/* Load every single element of the list/set */
while(zsetlen--) {
robj *ele;
double *score = zmalloc(sizeof(double));
if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
tryObjectEncoding(ele);
if (rdbLoadDoubleValue(fp,score) == -1) return NULL;
dictAdd(zs->dict,ele,score);
zslInsert(zs->zsl,*score,ele);
incrRefCount(ele); /* added to skiplist */
}
} else {
redisAssert(0 != 0);
}
return o;
}
static int rdbLoad(char *filename) {
FILE *fp;
robj *keyobj = NULL;
uint32_t dbid;
int type, retval, rdbver;
dict *d = server.db[0].dict;
redisDb *db = server.db+0;
char buf[1024];
time_t expiretime = -1, now = time(NULL);
long long loadedkeys = 0;
fp = fopen(filename,"r");
if (!fp) return REDIS_ERR;
if (fread(buf,9,1,fp) == 0) goto eoferr;
buf[9] = '\0';
if (memcmp(buf,"REDIS",5) != 0) {
fclose(fp);
redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
return REDIS_ERR;
}
rdbver = atoi(buf+5);
if (rdbver != 1) {
fclose(fp);
redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
return REDIS_ERR;
}
while(1) {
robj *o;
/* Read type. */
if ((type = rdbLoadType(fp)) == -1) goto eoferr;
if (type == REDIS_EXPIRETIME) {
if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
/* We read the time so we need to read the object type again */
if ((type = rdbLoadType(fp)) == -1) goto eoferr;
}
if (type == REDIS_EOF) break;
/* Handle SELECT DB opcode as a special case */
if (type == REDIS_SELECTDB) {
if ((dbid = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR)
goto eoferr;
if (dbid >= (unsigned)server.dbnum) {
redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
exit(1);
}
db = server.db+dbid;
d = db->dict;
continue;
}
/* Read key */
if ((keyobj = rdbLoadStringObject(fp)) == NULL) goto eoferr;
/* Read value */
if ((o = rdbLoadObject(type,fp)) == NULL) goto eoferr;
/* Add the new object in the hash table */
retval = dictAdd(d,keyobj,o);
if (retval == DICT_ERR) {
redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
exit(1);
}
/* Set the expire time if needed */
if (expiretime != -1) {
setExpire(db,keyobj,expiretime);
/* Delete this key if already expired */
if (expiretime < now) deleteKey(db,keyobj);
expiretime = -1;
}
keyobj = o = NULL;
/* Handle swapping while loading big datasets when VM is on */
loadedkeys++;
if (server.vm_enabled && (loadedkeys % 5000) == 0) {
while (zmalloc_used_memory() > server.vm_max_memory) {
if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
}
}
}
fclose(fp);
return REDIS_OK;
eoferr: /* unexpected end of file is handled here with a fatal exit */
if (keyobj) decrRefCount(keyobj);
redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
exit(1);
return REDIS_ERR; /* Just to avoid warning */
}
/*================================== Commands =============================== */
static void authCommand(redisClient *c) {
if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
c->authenticated = 1;
addReply(c,shared.ok);
} else {
c->authenticated = 0;
addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
}
}
static void pingCommand(redisClient *c) {
addReply(c,shared.pong);
}
static void echoCommand(redisClient *c) {
addReplyBulkLen(c,c->argv[1]);
addReply(c,c->argv[1]);
addReply(c,shared.crlf);
}
/*=================================== Strings =============================== */
static void setGenericCommand(redisClient *c, int nx) {
int retval;
if (nx) deleteIfVolatile(c->db,c->argv[1]);
retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
if (retval == DICT_ERR) {
if (!nx) {
/* If the key is about a swapped value, we want a new key object
* to overwrite the old. So we delete the old key in the database.
* This will also make sure that swap pages about the old object
* will be marked as free. */
if (deleteIfSwapped(c->db,c->argv[1]))
incrRefCount(c->argv[1]);
dictReplace(c->db->dict,c->argv[1],c->argv[2]);
incrRefCount(c->argv[2]);
} else {
addReply(c,shared.czero);
return;
}
} else {
incrRefCount(c->argv[1]);
incrRefCount(c->argv[2]);
}
server.dirty++;
removeExpire(c->db,c->argv[1]);
addReply(c, nx ? shared.cone : shared.ok);
}
static void setCommand(redisClient *c) {
setGenericCommand(c,0);
}
static void setnxCommand(redisClient *c) {
setGenericCommand(c,1);
}
static int getGenericCommand(redisClient *c) {
robj *o = lookupKeyRead(c->db,c->argv[1]);
if (o == NULL) {
addReply(c,shared.nullbulk);
return REDIS_OK;
} else {
if (o->type != REDIS_STRING) {
addReply(c,shared.wrongtypeerr);
return REDIS_ERR;
} else {
addReplyBulkLen(c,o);
addReply(c,o);
addReply(c,shared.crlf);
return REDIS_OK;
}
}
}
static void getCommand(redisClient *c) {
getGenericCommand(c);
}
static void getsetCommand(redisClient *c) {
if (getGenericCommand(c) == REDIS_ERR) return;
if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
dictReplace(c->db->dict,c->argv[1],c->argv[2]);
} else {
incrRefCount(c->argv[1]);
}
incrRefCount(c->argv[2]);
server.dirty++;
removeExpire(c->db,c->argv[1]);
}
static void mgetCommand(redisClient *c) {
int j;
addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
for (j = 1; j < c->argc; j++) {
robj *o = lookupKeyRead(c->db,c->argv[j]);
if (o == NULL) {
addReply(c,shared.nullbulk);
} else {
if (o->type != REDIS_STRING) {
addReply(c,shared.nullbulk);
} else {
addReplyBulkLen(c,o);
addReply(c,o);
addReply(c,shared.crlf);
}
}
}
}
static void msetGenericCommand(redisClient *c, int nx) {
int j, busykeys = 0;
if ((c->argc % 2) == 0) {
addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
return;
}
/* Handle the NX flag. The MSETNX semantic is to return zero and don't
* set nothing at all if at least one already key exists. */
if (nx) {
for (j = 1; j < c->argc; j += 2) {
if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
busykeys++;
}
}
}
if (busykeys) {
addReply(c, shared.czero);
return;
}
for (j = 1; j < c->argc; j += 2) {
int retval;
tryObjectEncoding(c->argv[j+1]);
retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
if (retval == DICT_ERR) {
dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
incrRefCount(c->argv[j+1]);
} else {
incrRefCount(c->argv[j]);
incrRefCount(c->argv[j+1]);
}
removeExpire(c->db,c->argv[j]);
}
server.dirty += (c->argc-1)/2;
addReply(c, nx ? shared.cone : shared.ok);
}
static void msetCommand(redisClient *c) {
msetGenericCommand(c,0);
}
static void msetnxCommand(redisClient *c) {
msetGenericCommand(c,1);
}
static void incrDecrCommand(redisClient *c, long long incr) {
long long value;
int retval;
robj *o;
o = lookupKeyWrite(c->db,c->argv[1]);
if (o == NULL) {
value = 0;
} else {
if (o->type != REDIS_STRING) {
value = 0;
} else {
char *eptr;
if (o->encoding == REDIS_ENCODING_RAW)
value = strtoll(o->ptr, &eptr, 10);
else if (o->encoding == REDIS_ENCODING_INT)
value = (long)o->ptr;
else
redisAssert(1 != 1);
}
}
value += incr;
o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
tryObjectEncoding(o);
retval = dictAdd(c->db->dict,c->argv[1],o);
if (retval == DICT_ERR) {
dictReplace(c->db->dict,c->argv[1],o);
removeExpire(c->db,c->argv[1]);
} else {
incrRefCount(c->argv[1]);
}
server.dirty++;
addReply(c,shared.colon);
addReply(c,o);
addReply(c,shared.crlf);
}
static void incrCommand(redisClient *c) {
incrDecrCommand(c,1);
}
static void decrCommand(redisClient *c) {
incrDecrCommand(c,-1);