Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'migrate-cache' into unstable

  • Loading branch information...
commit a779b7e901e9f863dc195d3590b554a917a0b3e2 2 parents aa2bf6b + 989a782
@antirez authored
View
154 src/cluster.c
@@ -1625,14 +1625,135 @@ void restoreCommand(redisClient *c) {
server.dirty++;
}
+/* MIGRATE socket cache implementation.
+ *
+ * We take a map between host:ip and a TCP socket that we used to connect
+ * to this instance in recent time.
+ * This sockets are closed when the max number we cache is reached, and also
+ * in serverCron() when they are around for more than a few seconds. */
+#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
+#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached socekts after 10 sec. */
+
+typedef struct migrateCachedSocket {
+ int fd;
+ time_t last_use_time;
+} migrateCachedSocket;
+
+/* Return a TCP scoket connected with the target instance, possibly returning
+ * a cached one.
+ *
+ * This function is responsible of sending errors to the client if a
+ * connection can't be established. In this case -1 is returned.
+ * Otherwise on success the socket is returned, and the caller should not
+ * attempt to free it after usage.
+ *
+ * If the caller detects an error while using the socket, migrateCloseSocket()
+ * should be called so that the connection will be craeted from scratch
+ * the next time. */
+int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) {
+ int fd;
+ sds name = sdsempty();
+ migrateCachedSocket *cs;
+
+ /* Check if we have an already cached socket for this ip:port pair. */
+ name = sdscatlen(name,host->ptr,sdslen(host->ptr));
+ name = sdscatlen(name,":",1);
+ name = sdscatlen(name,port->ptr,sdslen(port->ptr));
+ cs = dictFetchValue(server.migrate_cached_sockets,name);
+ if (cs) {
+ sdsfree(name);
+ cs->last_use_time = server.unixtime;
+ return cs->fd;
+ }
+
+ /* No cached socket, create one. */
+ if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
+ /* Too many items, drop one at random. */
+ dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
+ cs = dictGetVal(de);
+ close(cs->fd);
+ zfree(cs);
+ dictDelete(server.migrate_cached_sockets,dictGetKey(de));
+ }
+
+ /* Create the socket */
+ fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
+ atoi(c->argv[2]->ptr));
+ if (fd == -1) {
+ sdsfree(name);
+ addReplyErrorFormat(c,"Can't connect to target node: %s",
+ server.neterr);
+ return -1;
+ }
+ anetTcpNoDelay(server.neterr,fd);
+
+ /* Check if it connects within the specified timeout. */
+ if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
+ sdsfree(name);
+ addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
+ close(fd);
+ return -1;
+ }
+
+ /* Add to the cache and return it to the caller. */
+ cs = zmalloc(sizeof(*cs));
+ cs->fd = fd;
+ cs->last_use_time = server.unixtime;
+ dictAdd(server.migrate_cached_sockets,name,cs);
+ return fd;
+}
+
+/* Free a migrate cached connection. */
+void migrateCloseSocket(robj *host, robj *port) {
+ sds name = sdsempty();
+ migrateCachedSocket *cs;
+
+ name = sdscatlen(name,host->ptr,sdslen(host->ptr));
+ name = sdscatlen(name,":",1);
+ name = sdscatlen(name,port->ptr,sdslen(port->ptr));
+ cs = dictFetchValue(server.migrate_cached_sockets,name);
+ if (!cs) {
+ sdsfree(name);
+ return;
+ }
+
+ close(cs->fd);
+ zfree(cs);
+ dictDelete(server.migrate_cached_sockets,name);
+ sdsfree(name);
+}
+
+void migrateCloseTimedoutSockets(void) {
+ dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
+ dictEntry *de;
+
+ while((de = dictNext(di)) != NULL) {
+ migrateCachedSocket *cs = dictGetVal(de);
+
+ if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
+ close(cs->fd);
+ zfree(cs);
+ dictDelete(server.migrate_cached_sockets,dictGetKey(de));
+ }
+ }
+ dictReleaseIterator(di);
+}
+
/* MIGRATE host port key dbid timeout [COPY | REPLACE] */
void migrateCommand(redisClient *c) {
- int fd, copy = 0, replace = 0, j;
+ int fd, copy, replace, j;
long timeout;
long dbid;
- long long ttl = 0, expireat;
+ long long ttl, expireat;
robj *o;
rio cmd, payload;
+ int retry_num = 0;
+
+try_again:
+ /* Initialization */
+ copy = 0;
+ replace = 0;
+ ttl = 0;
/* Parse additional options */
for (j = 6; j < c->argc; j++) {
@@ -1651,7 +1772,7 @@ void migrateCommand(redisClient *c) {
return;
if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
return;
- if (timeout <= 0) timeout = 1;
+ if (timeout <= 0) timeout = 1000;
/* Check if the key is here. If not we reply with success as there is
* nothing to migrate (for instance the key expired in the meantime), but
@@ -1662,17 +1783,8 @@ void migrateCommand(redisClient *c) {
}
/* Connect */
- fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
- atoi(c->argv[2]->ptr));
- if (fd == -1) {
- addReplyErrorFormat(c,"Can't connect to target node: %s",
- server.neterr);
- return;
- }
- if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
- addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
- return;
- }
+ fd = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
+ if (fd == -1) return; /* error sent to the client by migrateGetSocket() */
/* Create RESTORE payload and generate the protocol to call the command. */
rioInitWithBuffer(&cmd,sdsempty());
@@ -1704,6 +1816,7 @@ void migrateCommand(redisClient *c) {
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
/* Tranfer the query to the other node in 64K chunks. */
+ errno = 0;
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
@@ -1749,19 +1862,22 @@ void migrateCommand(redisClient *c) {
}
sdsfree(cmd.io.buffer.ptr);
- close(fd);
return;
socket_wr_err:
- addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n"));
sdsfree(cmd.io.buffer.ptr);
- close(fd);
+ migrateCloseSocket(c->argv[1],c->argv[2]);
+ if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again;
+ addReplySds(c,
+ sdsnew("-IOERR error or timeout writing to target instance\r\n"));
return;
socket_rd_err:
- addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n"));
sdsfree(cmd.io.buffer.ptr);
- close(fd);
+ migrateCloseSocket(c->argv[1],c->argv[2]);
+ if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again;
+ addReplySds(c,
+ sdsnew("-IOERR error or timeout reading from target node\r\n"));
return;
}
View
26 src/redis.c
@@ -561,6 +561,16 @@ dictType clusterNodesDictType = {
NULL /* val destructor */
};
+/* Migrate cache dict type. */
+dictType migrateCacheDictType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ dictSdsDestructor, /* key destructor */
+ NULL /* val destructor */
+};
+
int htNeedsResize(dict *dict) {
long long size, used;
@@ -972,16 +982,21 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* to detect transfer failures. */
run_with_period(1000) replicationCron();
- /* Run other sub-systems specific cron jobs */
+ /* Run the Redis Cluster cron. */
run_with_period(1000) {
if (server.cluster_enabled) clusterCron();
}
- /* Run the sentinel timer if we are in sentinel mode. */
+ /* Run the Sentinel timer if we are in sentinel mode. */
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}
+ /* Cleanup expired MIGRATE cached sockets. */
+ run_with_period(1000) {
+ migrateCloseTimedoutSockets();
+ }
+
server.cronloops++;
return 1000/REDIS_HZ;
}
@@ -1149,6 +1164,7 @@ void initServerConfig() {
server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
server.lua_client = NULL;
server.lua_timedout = 0;
+ server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
updateLRUClock();
resetServerSaveParams();
@@ -2066,7 +2082,8 @@ sds genRedisInfoString(char *section) {
"keyspace_misses:%lld\r\n"
"pubsub_channels:%ld\r\n"
"pubsub_patterns:%lu\r\n"
- "latest_fork_usec:%lld\r\n",
+ "latest_fork_usec:%lld\r\n"
+ "migrate_cached_sockets:%ld\r\n",
server.stat_numconnections,
server.stat_numcommands,
getOperationsPerSecond(),
@@ -2077,7 +2094,8 @@ sds genRedisInfoString(char *section) {
server.stat_keyspace_misses,
dictSize(server.pubsub_channels),
listLength(server.pubsub_patterns),
- server.stat_fork_time);
+ server.stat_fork_time,
+ dictSize(server.migrate_cached_sockets));
}
/* Replication */
View
4 src/redis.h
@@ -646,7 +646,8 @@ struct redisServer {
list *clients_to_close; /* Clients to close asynchronously */
list *slaves, *monitors; /* List of slaves and MONITORs */
redisClient *current_client; /* Current client, only used on crash report */
- char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
+ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
+ dict *migrate_cached_sockets;/* MIGRATE cached sockets */
/* RDB / AOF loading information */
int loading; /* We are loading data from disk if true */
off_t loading_total_bytes;
@@ -1174,6 +1175,7 @@ int clusterAddNode(clusterNode *node);
void clusterCron(void);
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
void clusterPropagatePublish(robj *channel, robj *message);
+void migrateCloseTimedoutSockets(void);
/* Sentinel */
void initSentinelConfig(void);
View
2  tests/test_helper.tcl
@@ -11,6 +11,7 @@ source tests/support/util.tcl
set ::all_tests {
unit/printver
+ unit/dump
unit/auth
unit/protocol
unit/basic
@@ -40,7 +41,6 @@ set ::all_tests {
unit/introspection
unit/limits
unit/obuf-limits
- unit/dump
unit/bitops
}
# Index to the next test to run in the ::all_tests list.
View
25 tests/unit/dump.tcl
@@ -43,6 +43,27 @@ start_server {tags {"dump"}} {
r dump nonexisting_key
} {}
+ test {MIGRATE is caching connections} {
+ # Note, we run this as first test so that the connection cache
+ # is empty.
+ set first [srv 0 client]
+ r set key "Some Value"
+ start_server {tags {"repl"}} {
+ set second [srv 0 client]
+ set second_host [srv 0 host]
+ set second_port [srv 0 port]
+
+ assert_match {*migrate_cached_sockets:0*} [r -1 info]
+ r -1 migrate $second_host $second_port key 9 1000
+ assert_match {*migrate_cached_sockets:1*} [r -1 info]
+ }
+ }
+
+ test {MIGRATE cached connections are released after some time} {
+ after 15000
+ assert_match {*migrate_cached_sockets:0*} [r info]
+ }
+
test {MIGRATE is able to migrate a key between two instances} {
set first [srv 0 client]
r set key "Some Value"
@@ -180,9 +201,9 @@ start_server {tags {"dump"}} {
assert {[$second exists key] == 0}
set rd [redis_deferring_client]
- $rd debug sleep 5.0 ; # Make second server unable to reply.
+ $rd debug sleep 1.0 ; # Make second server unable to reply.
set e {}
- catch {r -1 migrate $second_host $second_port key 9 1000} e
+ catch {r -1 migrate $second_host $second_port key 9 500} e
assert_match {IOERR*} $e
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.