Skip to content

Commit

Permalink
Large blocks of replica client output buffer could lead to psync loop…
Browse files Browse the repository at this point in the history
…s and unnecessary memory usage (redis#11666)

This can happen when a key almost equal or larger than the
client output buffer limit of the replica is written.

Example:
1. DB is empty
2. Backlog size is 1 MB
3. Client out put buffer limit is 2 MB
4. Client writes a 3 MB key
5. The shared replication buffer will have a single node which contains
the key written above, and it exceeds the backlog size.

At this point the client output buffer usage calculation will report the
replica buffer to be 3 MB (or more) even after sending all the data to
the replica.
The primary drops the replica connection for exceeding the limits,
the replica reconnects and successfully executes partial sync but the
primary will drop the connection again because the buffer usage is still
3 MB. This happens over and over.

To mitigate the problem, this fix limits the maximum size of a single
backlog node to be (repl_backlog_size/16). This way a single node can't
exceed the limits of the COB (the COB has to be larger than the
backlog).
It also means that if the backlog has some excessive data it can't trim,
it would be at most about 6% overuse.

other notes:
1. a loop was added in feedReplicationBuffer which caused a massive LOC
  change due to indentation, the actual changes are just the `min(max` and the loop.
3. an unrelated change in an existing test to speed up a server termination which took 10 seconds.

Co-authored-by: Oran Agra <oran@redislabs.com>
  • Loading branch information
2 people authored and enjoy-binbin committed Jul 31, 2023
1 parent 39cded5 commit 9393b45
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 72 deletions.
152 changes: 80 additions & 72 deletions src/replication.c
Expand Up @@ -335,86 +335,94 @@ void feedReplicationBuffer(char *s, size_t len) {
server.master_repl_offset += len;
server.repl_backlog->histlen += len;

size_t start_pos = 0; /* The position of referenced block to start sending. */
listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
int add_new_block = 0; /* Create new block if current block is total used. */
listNode *ln = listLast(server.repl_buffer_blocks);
replBufBlock *tail = ln ? listNodeValue(ln) : NULL;

/* Append to tail string when possible. */
if (tail && tail->size > tail->used) {
start_node = listLast(server.repl_buffer_blocks);
start_pos = tail->used;
/* Copy the part we can fit into the tail, and leave the rest for a
* new node */
size_t avail = tail->size - tail->used;
size_t copy = (avail >= len) ? len : avail;
memcpy(tail->buf + tail->used, s, copy);
tail->used += copy;
s += copy;
len -= copy;
}
if (len) {
/* Create a new node, make sure it is allocated to at
* least PROTO_REPLY_CHUNK_BYTES */
size_t usable_size;
size_t size = (len < PROTO_REPLY_CHUNK_BYTES) ? PROTO_REPLY_CHUNK_BYTES : len;
tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size);
/* Take over the allocation's internal fragmentation */
tail->size = usable_size - sizeof(replBufBlock);
tail->used = len;
tail->refcount = 0;
tail->repl_offset = server.master_repl_offset - tail->used + 1;
tail->id = repl_block_id++;
memcpy(tail->buf, s, len);
listAddNodeTail(server.repl_buffer_blocks, tail);
/* We also count the list node memory into replication buffer memory. */
server.repl_buffer_mem += (usable_size + sizeof(listNode));
add_new_block = 1;
if (start_node == NULL) {
while(len > 0) {
size_t start_pos = 0; /* The position of referenced block to start sending. */
listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
int add_new_block = 0; /* Create new block if current block is total used. */
listNode *ln = listLast(server.repl_buffer_blocks);
replBufBlock *tail = ln ? listNodeValue(ln) : NULL;

/* Append to tail string when possible. */
if (tail && tail->size > tail->used) {
start_node = listLast(server.repl_buffer_blocks);
start_pos = 0;
start_pos = tail->used;
/* Copy the part we can fit into the tail, and leave the rest for a
* new node */
size_t avail = tail->size - tail->used;
size_t copy = (avail >= len) ? len : avail;
memcpy(tail->buf + tail->used, s, copy);
tail->used += copy;
s += copy;
len -= copy;
}
if (len) {
/* Create a new node, make sure it is allocated to at
* least PROTO_REPLY_CHUNK_BYTES */
size_t usable_size;
/* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them,
* and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't
* trim when we only still need to hold a small portion from them. */
size_t size = min(max(len, (size_t)PROTO_REPLY_CHUNK_BYTES), (size_t)server.repl_backlog_size / 16);
tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size);
/* Take over the allocation's internal fragmentation */
tail->size = usable_size - sizeof(replBufBlock);
size_t copy = (tail->size >= len) ? len : tail->size;
tail->used = copy;
tail->refcount = 0;
tail->repl_offset = server.master_repl_offset - tail->used + 1;
tail->id = repl_block_id++;
memcpy(tail->buf, s, copy);
listAddNodeTail(server.repl_buffer_blocks, tail);
/* We also count the list node memory into replication buffer memory. */
server.repl_buffer_mem += (usable_size + sizeof(listNode));
add_new_block = 1;
if (start_node == NULL) {
start_node = listLast(server.repl_buffer_blocks);
start_pos = 0;
}
s += copy;
len -= copy;
}
}

/* For output buffer of replicas. */
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (!canFeedReplicaReplBuffer(slave)) continue;
/* For output buffer of replicas. */
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (!canFeedReplicaReplBuffer(slave)) continue;

/* Update shared replication buffer start position. */
if (slave->ref_repl_buf_node == NULL) {
slave->ref_repl_buf_node = start_node;
slave->ref_block_pos = start_pos;
/* Only increase the start block reference count. */
((replBufBlock *)listNodeValue(start_node))->refcount++;
}

/* Update shared replication buffer start position. */
if (slave->ref_repl_buf_node == NULL) {
slave->ref_repl_buf_node = start_node;
slave->ref_block_pos = start_pos;
/* Only increase the start block reference count. */
((replBufBlock *)listNodeValue(start_node))->refcount++;
/* Check output buffer limit only when add new block. */
if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1);
}

/* Check output buffer limit only when add new block. */
if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1);
}

/* For replication backlog */
if (server.repl_backlog->ref_repl_buf_node == NULL) {
server.repl_backlog->ref_repl_buf_node = start_node;
/* Only increase the start block reference count. */
((replBufBlock *)listNodeValue(start_node))->refcount++;
/* For replication backlog */
if (server.repl_backlog->ref_repl_buf_node == NULL) {
server.repl_backlog->ref_repl_buf_node = start_node;
/* Only increase the start block reference count. */
((replBufBlock *)listNodeValue(start_node))->refcount++;

/* Replication buffer must be empty before adding replication stream
* into replication backlog. */
serverAssert(add_new_block == 1 && start_pos == 0);
}
if (add_new_block) {
createReplicationBacklogIndex(listLast(server.repl_buffer_blocks));
/* Replication buffer must be empty before adding replication stream
* into replication backlog. */
serverAssert(add_new_block == 1 && start_pos == 0);
}
if (add_new_block) {
createReplicationBacklogIndex(listLast(server.repl_buffer_blocks));
}
/* Try to trim replication backlog since replication backlog may exceed
* our setting when we add replication stream. Note that it is important to
* try to trim at least one node since in the common case this is where one
* new backlog node is added and one should be removed. See also comments
* in freeMemoryGetNotCountedMemory for details. */
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
/* Try to trim replication backlog since replication backlog may exceed
* our setting when we add replication stream. Note that it is important to
* try to trim at least one node since in the common case this is where one
* new backlog node is added and one should be removed. See also comments
* in freeMemoryGetNotCountedMemory for details. */
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}

/* Propagate write commands to replication stream.
Expand Down
57 changes: 57 additions & 0 deletions tests/integration/replication-buffer.tcl
Expand Up @@ -180,6 +180,8 @@ start_server {} {
}
exec kill -SIGCONT $replica2_pid
}
# speed up termination
$master config set shutdown-timeout 0
}
}
}
Expand Down Expand Up @@ -227,3 +229,58 @@ test {Partial resynchronization is successful even client-output-buffer-limit is
}
}
}

# This test was added to make sure big keys added to the backlog do not trigger psync loop.
test {Replica client-output-buffer size is limited to backlog_limit/16 when no replication data is pending} {
proc client_field {r type f} {
set client [$r client list type $type]
if {![regexp $f=(\[a-zA-Z0-9-\]+) $client - res]} {
error "field $f not found for in $client"
}
return $res
}

start_server {tags {"repl external:skip"}} {
start_server {} {
set replica [srv -1 client]
set replica_host [srv -1 host]
set replica_port [srv -1 port]
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]

$master config set repl-backlog-size 16384
$master config set client-output-buffer-limit "replica 32768 32768 60"
# Key has has to be larger than replica client-output-buffer limit.
set keysize [expr 256*1024]

$replica replicaof $master_host $master_port
wait_for_condition 50 100 {
[lindex [$replica role] 0] eq {slave} &&
[string match {*master_link_status:up*} [$replica info replication]]
} else {
fail "Can't turn the instance into a replica"
}

set _v [prepare_value $keysize]
$master set key $_v
wait_for_ofs_sync $master $replica

# Write another key to force the test to wait for another event loop iteration
# to give the serverCron a chance to disconnect replicas with COB size exeeeding the limits
$master set key1 "1"
wait_for_ofs_sync $master $replica

assert {[status $master connected_slaves] == 1}

wait_for_condition 50 100 {
[client_field $master replica tot-mem] < $keysize
} else {
fail "replica client-output-buffer usage is higher than expected."
}

assert {[status $master sync_partial_ok] == 0}
}
}
}

0 comments on commit 9393b45

Please sign in to comment.