Skip to content

Commit

Permalink
first attempt to non blocking implementation of slave replication and…
Browse files Browse the repository at this point in the history
… SYNC bulk data download. Never compiled so far...
  • Loading branch information
antirez committed Nov 4, 2010
1 parent 4d7e125 commit f4aa600
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 44 deletions.
1 change: 1 addition & 0 deletions src/networking.c
Expand Up @@ -467,6 +467,7 @@ void freeClient(redisClient *c) {
/* Case 2: we lost the connection with the master. */
if (c->flags & REDIS_MASTER) {
server.master = NULL;
/* FIXME */
server.replstate = REDIS_REPL_CONNECT;
/* Since we lost the connection with the master, we should also
* close the connection with all our slaves if we have any, so
Expand Down
12 changes: 4 additions & 8 deletions src/redis.c
Expand Up @@ -633,14 +633,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
}

/* Check if we should connect to a MASTER */
if (server.replstate == REDIS_REPL_CONNECT && !(loops % 10)) {
redisLog(REDIS_NOTICE,"Connecting to MASTER...");
if (syncWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
if (server.appendonly) rewriteAppendOnlyFileBackground();
}
}
/* Replication cron function -- used to reconnect to master and
* to detect transfer failures. */
if (!(loops % 10)) replicationCron(void);

return 100;
}

Expand Down
14 changes: 12 additions & 2 deletions src/redis.h
Expand Up @@ -152,7 +152,8 @@
/* Slave replication state - slave side */
#define REDIS_REPL_NONE 0 /* No active replication */
#define REDIS_REPL_CONNECT 1 /* Must connect to master */
#define REDIS_REPL_CONNECTED 2 /* Connected to master */
#define REDIS_REPL_TRANFER 2 /* Receiving .rdb from master */
#define REDIS_REPL_CONNECTED 3 /* Connected to master */

/* Slave replication state - from the point of view of master
* Note that in SEND_BULK and ONLINE state the slave receives new updates
Expand Down Expand Up @@ -401,15 +402,23 @@ struct redisServer {
int activerehashing;
/* Replication related */
int isslave;
/* Slave specific fields */
char *masterauth;
char *masterhost;
int masterport;
redisClient *master; /* client that is master for this slave */
int replstate;
int replstate; /* replication status if the instance is a slave */
off_t repl_transfer_left; /* bytes left reading .rdb if this is a slave */
int repl_transfer_s; /* slave -> master SYNC socket */
int repl_transfer_fd; /* slave -> master SYNC temp file descriptor */
char *repl_transfer_tmpfile; /* slave-> master SYNC temp file name */
time_t repl_transfer_lastio; /* unix time of the latest read, for timeout */
/* Limits */
unsigned int maxclients;
unsigned long long maxmemory;
int maxmemory_policy;
int maxmemory_samples;
/* Blocked clients */
unsigned int blpop_blocked_clients;
unsigned int vm_blocked_clients;
/* Sort parameters - qsort_r() is only available under BSD so we
Expand Down Expand Up @@ -713,6 +722,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc);
int syncWithMaster(void);
void updateSlavesWaitingBgsave(int bgsaveerr);
void replicationCron(void);

/* RDB persistence */
int rdbLoad(char *filename);
Expand Down
134 changes: 100 additions & 34 deletions src/replication.c
Expand Up @@ -5,6 +5,8 @@
#include <fcntl.h>
#include <sys/stat.h>

/* ---------------------------------- MASTER -------------------------------- */

void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
Expand Down Expand Up @@ -288,9 +290,68 @@ void updateSlavesWaitingBgsave(int bgsaveerr) {
}
}

/* ----------------------------------- SLAVE -------------------------------- */

/* Abort the async download of the bulk dataset while SYNC-ing with master */
void replicationAbortSyncTransfer(void) {
redisAssert(server.replstate == REDIS_REPL_TRANSFER);

aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
close(server.repl_transfer_s);
close(server.repl_transfer_fd);
unlink(server.repl_transfer_tmpfile);
zfree(server.repl_transfer_tmpfile);
server.replstate = REDIS_REPL_CONNECT;
}

/* Asynchronously read the SYNC payload we receive from a master */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
unsigned char buf[4096]
size_t nread, readlen;

readlen = (server.repl_transfer_left < sizeof(buf)) ?
server.repl_transfer_left : sizeof(buf);
nread = read(fd,buf,readlen);
if (nread <= 0) {
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
replicationAbortSyncTransfer();
return;
}
server.repl_transfer_lastio = time(NULL);
if (write(server.repl_transfer_fd,buf,nread) != nread) {
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
replicationAbortSyncTransfer();
return;
}
server.repl_transfer_left -= nread;
/* Check if the transfer is now complete */
if (server.repl_transfer_left == 0) {
if (rename(server.repl_transfer_tmpfile,server.dbfilename) == -1) {
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
replicationAbortSyncTransfer();
return;
}
emptyDb();
if (rdbLoad(server.dbfilename) != REDIS_OK) {
redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
replicationAbortSyncTransfer();
return;
}
/* Final setup of the connected slave <- master link */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
server.master = createClient(server.repl_transfer_s);
server.master->flags |= REDIS_MASTER;
server.master->authenticated = 1;
server.replstate = REDIS_REPL_CONNECTED;
}
}

int syncWithMaster(void) {
char buf[1024], tmpfile[256], authcmd[1024];
long dumpsize;
off_t dumpsize;
int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
int dfd, maxtries = 5;

Expand Down Expand Up @@ -362,43 +423,21 @@ int syncWithMaster(void) {
redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
return REDIS_ERR;
}
while(dumpsize) {
int nread, nwritten;

nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
if (nread <= 0) {
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
close(fd);
close(dfd);
return REDIS_ERR;
}
nwritten = write(dfd,buf,nread);
if (nwritten == -1) {
redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
close(fd);
close(dfd);
return REDIS_ERR;
}
dumpsize -= nread;
}
close(dfd);
if (rename(tmpfile,server.dbfilename) == -1) {
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
unlink(tmpfile);
close(fd);
return REDIS_ERR;
}
emptyDb();
if (rdbLoad(server.dbfilename) != REDIS_OK) {
redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
/* Setup the non blocking download of the bulk file. */
if (aeCreateFileEvent(server.el, fd, AE_READABLE,readSyncBulkPayload) ==
AE_ERR)
{
close(fd);
redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
return REDIS_ERR;
}
server.master = createClient(fd);
server.master->flags |= REDIS_MASTER;
server.master->authenticated = 1;
server.replstate = REDIS_REPL_CONNECTED;
server.replstate = REDIS_REPL_TRANSFER;
server.repl_transfer_left = dumpsize;
server.repl_transfer_s = fd;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = time(NULL);
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return REDIS_OK;
}

Expand All @@ -409,6 +448,8 @@ void slaveofCommand(redisClient *c) {
sdsfree(server.masterhost);
server.masterhost = NULL;
if (server.master) freeClient(server.master);
if (server.replstate == REDIS_REPL_TRANSFER)
replicationAbortSyncTransfer();
server.replstate = REDIS_REPL_NONE;
redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
}
Expand All @@ -417,9 +458,34 @@ void slaveofCommand(redisClient *c) {
server.masterhost = sdsdup(c->argv[1]->ptr);
server.masterport = atoi(c->argv[2]->ptr);
if (server.master) freeClient(server.master);
if (server.replstate == REDIS_REPL_TRANSFER)
replicationAbortSyncTransfer();
server.replstate = REDIS_REPL_CONNECT;
redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
server.masterhost, server.masterport);
}
addReply(c,shared.ok);
}

/* --------------------------- REPLICATION CRON ---------------------------- */

#define REDIS_REPL_TRANSFER_TIMEOUT 60

void replicationCron(void) {
/* Bulk transfer I/O timeout? */
if (server.masterhost && server.replstate == REDIS_REPL_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > REDIS_REPL_TRANSFER_TIMEOUT)
{
redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER...");
replicationAbortSyncTransfer();
}

/* Check if we should connect to a MASTER */
if (server.replstate == REDIS_REPL_CONNECT && !(loops % 10)) {
redisLog(REDIS_NOTICE,"Connecting to MASTER...");
if (syncWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
if (server.appendonly) rewriteAppendOnlyFileBackground();
}
}
}

0 comments on commit f4aa600

Please sign in to comment.