diff --git a/src/replication.c b/src/replication.c index 33bb8242c4f6..854688f2c814 100644 --- a/src/replication.c +++ b/src/replication.c @@ -335,86 +335,94 @@ void feedReplicationBuffer(char *s, size_t len) { server.master_repl_offset += len; server.repl_backlog->histlen += len; - size_t start_pos = 0; /* The position of referenced block to start sending. */ - listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ - int add_new_block = 0; /* Create new block if current block is total used. */ - listNode *ln = listLast(server.repl_buffer_blocks); - replBufBlock *tail = ln ? listNodeValue(ln) : NULL; - - /* Append to tail string when possible. */ - if (tail && tail->size > tail->used) { - start_node = listLast(server.repl_buffer_blocks); - start_pos = tail->used; - /* Copy the part we can fit into the tail, and leave the rest for a - * new node */ - size_t avail = tail->size - tail->used; - size_t copy = (avail >= len) ? len : avail; - memcpy(tail->buf + tail->used, s, copy); - tail->used += copy; - s += copy; - len -= copy; - } - if (len) { - /* Create a new node, make sure it is allocated to at - * least PROTO_REPLY_CHUNK_BYTES */ - size_t usable_size; - size_t size = (len < PROTO_REPLY_CHUNK_BYTES) ? PROTO_REPLY_CHUNK_BYTES : len; - tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size); - /* Take over the allocation's internal fragmentation */ - tail->size = usable_size - sizeof(replBufBlock); - tail->used = len; - tail->refcount = 0; - tail->repl_offset = server.master_repl_offset - tail->used + 1; - tail->id = repl_block_id++; - memcpy(tail->buf, s, len); - listAddNodeTail(server.repl_buffer_blocks, tail); - /* We also count the list node memory into replication buffer memory. */ - server.repl_buffer_mem += (usable_size + sizeof(listNode)); - add_new_block = 1; - if (start_node == NULL) { + while(len > 0) { + size_t start_pos = 0; /* The position of referenced block to start sending. */ + listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ + int add_new_block = 0; /* Create new block if current block is total used. */ + listNode *ln = listLast(server.repl_buffer_blocks); + replBufBlock *tail = ln ? listNodeValue(ln) : NULL; + + /* Append to tail string when possible. */ + if (tail && tail->size > tail->used) { start_node = listLast(server.repl_buffer_blocks); - start_pos = 0; + start_pos = tail->used; + /* Copy the part we can fit into the tail, and leave the rest for a + * new node */ + size_t avail = tail->size - tail->used; + size_t copy = (avail >= len) ? len : avail; + memcpy(tail->buf + tail->used, s, copy); + tail->used += copy; + s += copy; + len -= copy; + } + if (len) { + /* Create a new node, make sure it is allocated to at + * least PROTO_REPLY_CHUNK_BYTES */ + size_t usable_size; + /* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them, + * and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't + * trim when we only still need to hold a small portion from them. */ + size_t size = min(max(len, (size_t)PROTO_REPLY_CHUNK_BYTES), (size_t)server.repl_backlog_size / 16); + tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size); + /* Take over the allocation's internal fragmentation */ + tail->size = usable_size - sizeof(replBufBlock); + size_t copy = (tail->size >= len) ? len : tail->size; + tail->used = copy; + tail->refcount = 0; + tail->repl_offset = server.master_repl_offset - tail->used + 1; + tail->id = repl_block_id++; + memcpy(tail->buf, s, copy); + listAddNodeTail(server.repl_buffer_blocks, tail); + /* We also count the list node memory into replication buffer memory. */ + server.repl_buffer_mem += (usable_size + sizeof(listNode)); + add_new_block = 1; + if (start_node == NULL) { + start_node = listLast(server.repl_buffer_blocks); + start_pos = 0; + } + s += copy; + len -= copy; } - } - /* For output buffer of replicas. */ - listIter li; - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - if (!canFeedReplicaReplBuffer(slave)) continue; + /* For output buffer of replicas. */ + listIter li; + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *slave = ln->value; + if (!canFeedReplicaReplBuffer(slave)) continue; + + /* Update shared replication buffer start position. */ + if (slave->ref_repl_buf_node == NULL) { + slave->ref_repl_buf_node = start_node; + slave->ref_block_pos = start_pos; + /* Only increase the start block reference count. */ + ((replBufBlock *)listNodeValue(start_node))->refcount++; + } - /* Update shared replication buffer start position. */ - if (slave->ref_repl_buf_node == NULL) { - slave->ref_repl_buf_node = start_node; - slave->ref_block_pos = start_pos; - /* Only increase the start block reference count. */ - ((replBufBlock *)listNodeValue(start_node))->refcount++; + /* Check output buffer limit only when add new block. */ + if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1); } - /* Check output buffer limit only when add new block. */ - if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1); - } - - /* For replication backlog */ - if (server.repl_backlog->ref_repl_buf_node == NULL) { - server.repl_backlog->ref_repl_buf_node = start_node; - /* Only increase the start block reference count. */ - ((replBufBlock *)listNodeValue(start_node))->refcount++; + /* For replication backlog */ + if (server.repl_backlog->ref_repl_buf_node == NULL) { + server.repl_backlog->ref_repl_buf_node = start_node; + /* Only increase the start block reference count. */ + ((replBufBlock *)listNodeValue(start_node))->refcount++; - /* Replication buffer must be empty before adding replication stream - * into replication backlog. */ - serverAssert(add_new_block == 1 && start_pos == 0); - } - if (add_new_block) { - createReplicationBacklogIndex(listLast(server.repl_buffer_blocks)); + /* Replication buffer must be empty before adding replication stream + * into replication backlog. */ + serverAssert(add_new_block == 1 && start_pos == 0); + } + if (add_new_block) { + createReplicationBacklogIndex(listLast(server.repl_buffer_blocks)); + } + /* Try to trim replication backlog since replication backlog may exceed + * our setting when we add replication stream. Note that it is important to + * try to trim at least one node since in the common case this is where one + * new backlog node is added and one should be removed. See also comments + * in freeMemoryGetNotCountedMemory for details. */ + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); } - /* Try to trim replication backlog since replication backlog may exceed - * our setting when we add replication stream. Note that it is important to - * try to trim at least one node since in the common case this is where one - * new backlog node is added and one should be removed. See also comments - * in freeMemoryGetNotCountedMemory for details. */ - incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); } /* Propagate write commands to replication stream. diff --git a/tests/integration/replication-buffer.tcl b/tests/integration/replication-buffer.tcl index 07e2cbc1b4bd..fe85632ae14d 100644 --- a/tests/integration/replication-buffer.tcl +++ b/tests/integration/replication-buffer.tcl @@ -180,6 +180,8 @@ start_server {} { } exec kill -SIGCONT $replica2_pid } + # speed up termination + $master config set shutdown-timeout 0 } } } @@ -227,3 +229,58 @@ test {Partial resynchronization is successful even client-output-buffer-limit is } } } + +# This test was added to make sure big keys added to the backlog do not trigger psync loop. +test {Replica client-output-buffer size is limited to backlog_limit/16 when no replication data is pending} { + proc client_field {r type f} { + set client [$r client list type $type] + if {![regexp $f=(\[a-zA-Z0-9-\]+) $client - res]} { + error "field $f not found for in $client" + } + return $res + } + + start_server {tags {"repl external:skip"}} { + start_server {} { + set replica [srv -1 client] + set replica_host [srv -1 host] + set replica_port [srv -1 port] + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + $master config set repl-backlog-size 16384 + $master config set client-output-buffer-limit "replica 32768 32768 60" + # Key has has to be larger than replica client-output-buffer limit. + set keysize [expr 256*1024] + + $replica replicaof $master_host $master_port + wait_for_condition 50 100 { + [lindex [$replica role] 0] eq {slave} && + [string match {*master_link_status:up*} [$replica info replication]] + } else { + fail "Can't turn the instance into a replica" + } + + set _v [prepare_value $keysize] + $master set key $_v + wait_for_ofs_sync $master $replica + + # Write another key to force the test to wait for another event loop iteration + # to give the serverCron a chance to disconnect replicas with COB size exeeeding the limits + $master set key1 "1" + wait_for_ofs_sync $master $replica + + assert {[status $master connected_slaves] == 1} + + wait_for_condition 50 100 { + [client_field $master replica tot-mem] < $keysize + } else { + fail "replica client-output-buffer usage is higher than expected." + } + + assert {[status $master sync_partial_ok] == 0} + } + } +} +