Skip to content

Commit

Permalink
PSYNC2: different improvements to Redis replication.
Browse files Browse the repository at this point in the history
The gist of the changes is that now, partial resynchronizations between
slaves and masters (without the need of a full resync with RDB transfer
and so forth), work in a number of cases when it was impossible
in the past. For instance:

1. When a slave is promoted to mastrer, the slaves of the old master can
partially resynchronize with the new master.

2. Chained slalves (slaves of slaves) can be moved to replicate to other
slaves or the master itsef, without requiring a full resync.

3. The master itself, after being turned into a slave, is able to
partially resynchronize with the new master, when it joins replication
again.

In order to obtain this, the following main changes were operated:

* Slaves also take a replication backlog, not just masters.

* Same stream replication for all the slaves and sub slaves. The
replication stream is identical from the top level master to its slaves
and is also the same from the slaves to their sub-slaves and so forth.
This means that if a slave is later promoted to master, it has the
same replication backlong, and can partially resynchronize with its
slaves (that were previously slaves of the old master).

* A given replication history is no longer identified by the `runid` of
a Redis node. There is instead a `replication ID` which changes every
time the instance has a new history no longer coherent with the past
one. So, for example, slaves publish the same replication history of
their master, however when they are turned into masters, they publish
a new replication ID, but still remember the old ID, so that they are
able to partially resynchronize with slaves of the old master (up to a
given offset).

* The replication protocol was slightly modified so that a new extended
+CONTINUE reply from the master is able to inform the slave of a
replication ID change.

* REPLCONF CAPA is used in order to notify masters that a slave is able
to understand the new +CONTINUE reply.

* The RDB file was extended with an auxiliary field that is able to
select a given DB after loading in the slave, so that the slave can
continue receiving the replication stream from the point it was
disconnected without requiring the master to insert "SELECT" statements.
This is useful in order to guarantee the "same stream" property, because
the slave must be able to accumulate an identical backlog.

* Slave pings to sub-slaves are now sent in a special form, when the
top-level master is disconnected, in order to don't interfer with the
replication stream. We just use out of band "\n" bytes as in other parts
of the Redis protocol.

An old design document is available here:

https://gist.github.com/antirez/ae068f95c0d084891305

However the implementation is not identical to the description because
during the work to implement it, different changes were needed in order
to make things working well.
  • Loading branch information
antirez committed Nov 9, 2016
1 parent 18d32c7 commit 2669fb8
Show file tree
Hide file tree
Showing 10 changed files with 440 additions and 143 deletions.
4 changes: 4 additions & 0 deletions redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ repl-disable-tcp-nodelay no
# need to elapse, starting from the time the last slave disconnected, for
# the backlog buffer to be freed.
#
# Note that slaves never free the backlog for timeout, since they may be
# promoted to masters later, and should be able to correctly "partially
# resynchronize" with the slaves: hence they should always accumulate backlog.
#
# A value of 0 means to never release the backlog.
#
# repl-backlog-ttl 3600
Expand Down
4 changes: 2 additions & 2 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
if (rdbLoadRio(&rdb) != C_OK) {
if (rdbLoadRio(&rdb,NULL) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {
Expand Down Expand Up @@ -1152,7 +1152,7 @@ int rewriteAppendOnlyFile(char *filename) {

if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) {
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
Expand Down
2 changes: 1 addition & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ void flushallCommand(client *c) {
/* Normally rdbSave() will reset dirty, but we don't want this here
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
int saved_dirty = server.dirty;
rdbSave(server.rdb_filename);
rdbSave(server.rdb_filename,NULL);
server.dirty = saved_dirty;
}
server.dirty++;
Expand Down
4 changes: 2 additions & 2 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,12 @@ void debugCommand(client *c) {
if (c->argc >= 3) c->argv[2] = tryObjectEncoding(c->argv[2]);
serverAssertWithInfo(c,c->argv[0],1 == 2);
} else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
if (rdbSave(server.rdb_filename) != C_OK) {
if (rdbSave(server.rdb_filename,NULL) != C_OK) {
addReply(c,shared.err);
return;
}
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
if (rdbLoad(server.rdb_filename) != C_OK) {
if (rdbLoad(server.rdb_filename,NULL) != C_OK) {
addReplyError(c,"Error trying to load the RDB dump");
return;
}
Expand Down
26 changes: 24 additions & 2 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,14 @@ void addReplySds(client *c, sds s) {
}
}

/* This low level function just adds whatever protocol you send it to the
* client buffer, trying the static buffer initially, and using the string
* of objects if not possible.
*
* It is efficient because does not create an SDS object nor an Redis object
* if not needed. The object will only be created by calling
* _addReplyStringToList() if we fail to extend the existing tail object
* in the list of objects. */
void addReplyString(client *c, const char *s, size_t len) {
if (prepareClientToWrite(c) != C_OK) return;
if (_addReplyToBuffer(c,s,len) != C_OK)
Expand Down Expand Up @@ -1022,7 +1030,7 @@ int processInlineBuffer(client *c) {
char *newline;
int argc, j;
sds *argv, aux;
size_t querylen;
size_t querylen, protolen;

/* Search for end of line */
newline = strchr(c->querybuf,'\n');
Expand All @@ -1035,6 +1043,7 @@ int processInlineBuffer(client *c) {
}
return C_ERR;
}
protolen = (newline - c->querybuf)+1; /* Total protocol bytes of command. */

/* Handle the \r\n case. */
if (newline && newline != c->querybuf && *(newline-1) == '\r')
Expand All @@ -1057,6 +1066,15 @@ int processInlineBuffer(client *c) {
if (querylen == 0 && c->flags & CLIENT_SLAVE)
c->repl_ack_time = server.unixtime;

/* Newline from masters can be used to prevent timeouts, but should
* not affect the replication offset since they are always sent
* "out of band" directly writing to the socket and without passing
* from the output buffers. */
if (querylen == 0 && c->flags & CLIENT_MASTER) {
c->reploff -= protolen;
while (protolen--) chopReplicationBacklog();
}

/* Leave data after the first line of the query in the buffer */
sdsrange(c->querybuf,querylen+2,-1);

Expand Down Expand Up @@ -1321,7 +1339,11 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {

sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->reploff += nread;
if (c->flags & CLIENT_MASTER) {
c->reploff += nread;
replicationFeedSlavesFromMasterStream(server.slaves,
c->querybuf+qblen,nread);
}
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
Expand Down
49 changes: 32 additions & 17 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ int rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
}

/* Save a few default AUX fields with information about the RDB generated. */
int rdbSaveInfoAuxFields(rio *rdb, int flags) {
int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;

Expand All @@ -844,6 +844,16 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags) {
if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;

/* Handle saving options that generate aux fields. */
if (rsi) {
if (rsi->repl_stream_db &&
rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
== -1)
{
return -1;
}
}
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
return 1;
}
Expand All @@ -856,7 +866,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags) {
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(rio *rdb, int *error, int flags) {
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
Expand All @@ -869,7 +879,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags) {
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;

for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
Expand Down Expand Up @@ -945,15 +955,15 @@ int rdbSaveRio(rio *rdb, int *error, int flags) {
* While the suffix is the 40 bytes hex string we announced in the prefix.
* This way processes receiving the payload can understand when it ends
* without doing any processing of the content. */
int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];

getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
if (error) *error = 0;
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
if (rdbSaveRio(rdb,error,RDB_SAVE_NONE) == C_ERR) goto werr;
if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
return C_OK;

Expand All @@ -964,7 +974,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
}

/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename) {
int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
Expand All @@ -985,7 +995,7 @@ int rdbSave(char *filename) {
}

rioInitWithFile(&rdb,fp);
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE) == C_ERR) {
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
Expand Down Expand Up @@ -1023,7 +1033,7 @@ int rdbSave(char *filename) {
return C_ERR;
}

int rdbSaveBackground(char *filename) {
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
long long start;

Expand All @@ -1040,7 +1050,7 @@ int rdbSaveBackground(char *filename) {
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename);
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);

Expand Down Expand Up @@ -1410,7 +1420,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {

/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb) {
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi) {
uint64_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
Expand Down Expand Up @@ -1501,6 +1511,8 @@ int rdbLoadRio(rio *rdb) {
serverLog(LL_NOTICE,"RDB '%s': %s",
(char*)auxkey->ptr,
(char*)auxval->ptr);
} else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) {
if (rsi) rsi->repl_stream_db = atoi(auxval->ptr);
} else {
/* We ignore fields we don't understand, as by AUX field
* contract. */
Expand Down Expand Up @@ -1559,16 +1571,19 @@ int rdbLoadRio(rio *rdb) {
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
* filename is open for reading and a rio stream object created in order
* to do the actual loading. Moreover the ETA displayed in the INFO
* output is initialized and finalized. */
int rdbLoad(char *filename) {
* output is initialized and finalized.
*
* If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
* loading code will fiil the information fields in the structure. */
int rdbLoad(char *filename, rdbSaveInfo *rsi) {
FILE *fp;
rio rdb;
int retval;

if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoading(fp);
rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb);
retval = rdbLoadRio(&rdb,rsi);
fclose(fp);
stopLoading();
return retval;
Expand Down Expand Up @@ -1721,7 +1736,7 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {

/* Spawn an RDB child that writes the RDB to the sockets of the slaves
* that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
int rdbSaveToSlavesSockets(void) {
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
int *fds;
uint64_t *clientids;
int numfds;
Expand Down Expand Up @@ -1779,7 +1794,7 @@ int rdbSaveToSlavesSockets(void) {
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-to-slaves");

retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL);
retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi);
if (retval == C_OK && rioFlush(&slave_sockets) == 0)
retval = C_ERR;

Expand Down Expand Up @@ -1884,7 +1899,7 @@ void saveCommand(client *c) {
addReplyError(c,"Background save already in progress");
return;
}
if (rdbSave(server.rdb_filename) == C_OK) {
if (rdbSave(server.rdb_filename,NULL) == C_OK) {
addReply(c,shared.ok);
} else {
addReply(c,shared.err);
Expand Down Expand Up @@ -1918,7 +1933,7 @@ void bgsaveCommand(client *c) {
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenver "
"possible.");
}
} else if (rdbSaveBackground(server.rdb_filename) == C_OK) {
} else if (rdbSaveBackground(server.rdb_filename,NULL) == C_OK) {
addReplyStatus(c,"Background saving started");
} else {
addReply(c,shared.err);
Expand Down
10 changes: 5 additions & 5 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded);
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o);
int rdbLoadObjectType(rio *rdb);
int rdbLoad(char *filename);
int rdbSaveBackground(char *filename);
int rdbSaveToSlavesSockets(void);
int rdbLoad(char *filename, rdbSaveInfo *rsi);
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid);
int rdbSave(char *filename);
int rdbSave(char *filename, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj *o);
size_t rdbSavedObjectLen(robj *o);
robj *rdbLoadObject(int type, rio *rdb);
Expand All @@ -136,6 +136,6 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val);
int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb);
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi);

#endif
Loading

0 comments on commit 2669fb8

Please sign in to comment.