Skip to content

Commit

Permalink
Rewrite quicklist
Browse files Browse the repository at this point in the history
Why:
I tried to solve the issue I found earlier, but found myself stuck in a quagmire
because the issues kept coming up while I fix the old one, so I finally decided
to rewrite it.

Issues with the old one:
- A node which should be compressed stays raw
  This is due to by poor design of quicklist->recompress, the design forgot the
  situation that a node could stay uncompressed if it can not compress small
  enough. And if we changed the node, wo should perform compress on it again.
  See issue redis#12563.
- Iterator don't behave like iterator
  Iterator will be reset and not avaliable for further use after replace or
  insert, see marcro resetIterator(). The only operation that athe iterator does
  not get reset is quicklistDelEntry(), and it has a commment about it, but the
  comment is wrong, the iterator may not behave like the comment says.
  See issue redis#12614.
- Packed node violate size limit
  Certen call to function quicklistReplaceEntry(), quicklistInsertBefore() and
  quicklistInsertAfter() will cause a packed node violate size limit.
  See issue redis#12548.
- Merge operation only performed in insert
  There is no merging in delete nor replace, which can make the quicklist
  contain adjacent small nodes.
  See issue redis#12856.
- Algorithms that maintain compress depth are not efficient
  the algorithm to maintain compress depth after add or delete is to check nodes
  on both sides of the list, time complexity is O(n), where n is the uncompressed
  depth on both sides of the list.

All the changes:
- Partition the node
  Divide the node into three partitions: head, middle and tail. The head and
  tail partitions hold uncompressed nodes, and the middle partition holds
  compressed nodes. Therefore,the time complexity of maintaining compress depth
  after adding or deleting a node will drop to O(1), moving at most one node
  from one partition to another.
- Removed annoying members recompress, attempted_compress and dont_compress from
  quicklist node structure
- Merge structure quicklistIter and quicklistEntry
- The historical parameter packed_threshold has been removed
  This is mentioned by @sundb and @oranagra in pull request redis#12568.
- Merge strategy is added
  That is that any adjacent node in quicklist can not be merged.
  • Loading branch information
imchuncai committed Dec 12, 2023
1 parent c85a9b7 commit fa2a707
Show file tree
Hide file tree
Showing 13 changed files with 6,069 additions and 5,119 deletions.
5 changes: 2 additions & 3 deletions src/aof.c
Expand Up @@ -1802,8 +1802,7 @@ int rewriteListObject(rio *r, robj *key, robj *o) {
long long count = 0, items = listTypeLength(o);

listTypeIterator *li = listTypeInitIterator(o,0,LIST_TAIL);
listTypeEntry entry;
while (listTypeNext(li,&entry)) {
while (listTypeNext(li)) {
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;
Expand All @@ -1819,7 +1818,7 @@ int rewriteListObject(rio *r, robj *key, robj *o) {
unsigned char *vstr;
size_t vlen;
long long lval;
vstr = listTypeGetValue(&entry,&vlen,&lval);
vstr = listTypeGetValue(li,&vlen,&lval);
if (vstr) {
if (!rioWriteBulkString(r,(char*)vstr,vlen)) {
listTypeReleaseIterator(li);
Expand Down
41 changes: 17 additions & 24 deletions src/debug.c
Expand Up @@ -154,9 +154,8 @@ void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o)
mixStringObjectDigest(digest,o);
} else if (o->type == OBJ_LIST) {
listTypeIterator *li = listTypeInitIterator(o,0,LIST_TAIL);
listTypeEntry entry;
while(listTypeNext(li,&entry)) {
robj *eleobj = listTypeGet(&entry);
while(listTypeNext(li)) {
robj *eleobj = listTypeGet(li);
mixStringObjectDigest(digest,eleobj);
decrRefCount(eleobj);
}
Expand Down Expand Up @@ -470,9 +469,6 @@ void debugCommand(client *c) {
" Setting it to 0 disables expiring keys in background when they are not",
" accessed (otherwise the Redis behavior). Setting it to 1 reenables back the",
" default.",
"QUICKLIST-PACKED-THRESHOLD <size>",
" Sets the threshold for elements to be inserted as plain vs packed nodes",
" Default value is 1GB, allows values up to 4GB. Setting to 0 restores to default.",
"SET-SKIP-CHECKSUM-VALIDATION <0|1>",
" Enables or disables checksum checks for RDB files and RESTORE's payload.",
"SLEEP <seconds>",
Expand Down Expand Up @@ -617,29 +613,36 @@ NULL
if (val->encoding == OBJ_ENCODING_QUICKLIST) {
char *nextra = extra;
int remaining = sizeof(extra);
quicklist *ql = val->ptr;
struct quicklist *ql = val->ptr;
/* Add number of quicklist nodes */
int used = snprintf(nextra, remaining, " ql_nodes:%lu", ql->len);
int used = snprintf(nextra, remaining, " ql_nodes:%lu", quicklist_node_count(ql));
nextra += used;
remaining -= used;
/* Add average quicklist fill factor */
double avg = (double)ql->count/ql->len;
double avg = (double)ql->count/quicklist_node_count(ql);
used = snprintf(nextra, remaining, " ql_avg_node:%.2f", avg);
nextra += used;
remaining -= used;
/* Add quicklist fill level / max listpack size */
used = snprintf(nextra, remaining, " ql_listpack_max:%d", ql->fill);
used = snprintf(nextra, remaining, " ql_pack_max_count:%d", ql->fill->pack_max_count);
nextra += used;
remaining -= used;
used = snprintf(nextra, remaining, " ql_pack_max_size:%d", ql->fill->pack_max_size);
nextra += used;
remaining -= used;
/* Add isCompressed? */
int compressed = ql->compress != 0;
int compressed = ql->head->next->capacity != 0;
used = snprintf(nextra, remaining, " ql_compressed:%d", compressed);
nextra += used;
remaining -= used;
/* Add total uncompressed size */
unsigned long sz = 0;
for (quicklistNode *node = ql->head; node; node = node->next) {
sz += node->sz;
struct quicklist_partition *p;
struct quicklist_node *node;
quicklist_first_node(ql, &p, &node);
while (node) {
sz += node->raw_sz;
quicklist_next(p, node, &p, &node);
}
used = snprintf(nextra, remaining, " ql_uncompressed_size:%lu", sz);
nextra += used;
Expand Down Expand Up @@ -702,7 +705,7 @@ NULL
if (o->encoding != OBJ_ENCODING_QUICKLIST) {
addReplyError(c,"Not a quicklist encoded object.");
} else {
quicklistRepr(o->ptr, full);
quicklist_debug_print((struct quicklist *)o->ptr, full);
addReplyStatus(c,"Quicklist structure printed on stdout");
}
} else if (!strcasecmp(c->argv[1]->ptr,"populate") &&
Expand Down Expand Up @@ -850,16 +853,6 @@ NULL
{
server.active_expire_enabled = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"quicklist-packed-threshold") &&
c->argc == 3)
{
int memerr;
unsigned long long sz = memtoull((const char *)c->argv[2]->ptr, &memerr);
if (memerr || !quicklistisSetPackedThreshold(sz)) {
addReplyError(c, "argument must be a memory value bigger than 1 and smaller than 4gb");
} else {
addReply(c,shared.ok);
}
} else if (!strcasecmp(c->argv[1]->ptr,"set-skip-checksum-validation") &&
c->argc == 3)
{
Expand Down
51 changes: 24 additions & 27 deletions src/defrag.c
Expand Up @@ -296,29 +296,25 @@ void activeDefragList(list *l, int val_type) {
}
}

void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
quicklistNode *newnode, *node = *node_ref;
void activeDefragQuickListNode(struct quicklist_node **node_ref) {
struct quicklist_node *newnode, *node = *node_ref;
unsigned char *newzl;
if ((newnode = activeDefragAlloc(node))) {
if (newnode->prev)
newnode->prev->next = newnode;
else
ql->head = newnode;
if (newnode->next)
newnode->next->prev = newnode;
else
ql->tail = newnode;
newnode->prev->next = newnode;
newnode->next->prev = newnode;
*node_ref = node = newnode;
}
if ((newzl = activeDefragAlloc(node->entry)))
node->entry = newzl;
if ((newzl = activeDefragAlloc(node->carry)))
node->carry = newzl;
}

void activeDefragQuickListNodes(quicklist *ql) {
quicklistNode *node = ql->head;
void activeDefragQuickListNodes(struct quicklist *ql) {
struct quicklist_partition *p;
struct quicklist_node *node;
quicklist_first_node(ql, &p, &node);
while (node) {
activeDefragQuickListNode(ql, &node);
node = node->next;
activeDefragQuickListNode(&node);
quicklist_next(p, node, &p, &node);
}
}

Expand All @@ -332,33 +328,34 @@ void defragLater(redisDb *db, dictEntry *kde) {

/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
long scanLaterList(robj *ob, unsigned long *cursor, long long endtime) {
quicklist *ql = ob->ptr;
quicklistNode *node;
struct quicklist *ql = ob->ptr;
struct quicklist_partition *p;
struct quicklist_node *node;
long iterations = 0;
int bookmark_failed = 0;
if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST)
return 0;

if (*cursor == 0) {
/* if cursor is 0, we start new iteration */
node = ql->head;
quicklist_first_node(ql, &p, &node);
} else {
node = quicklistBookmarkFind(ql, "_AD");
node = quicklist_bm_find(ql, "_AD");
if (!node) {
/* if the bookmark was deleted, it means we reached the end. */
*cursor = 0;
return 0;
}
node = node->next;
node = quicklist_next_for_bookmark(ql, node);
}

(*cursor)++;
while (node) {
activeDefragQuickListNode(ql, &node);
activeDefragQuickListNode(&node);
server.stat_active_defrag_scanned++;
if (++iterations > 128 && !bookmark_failed) {
if (ustime() > endtime) {
if (!quicklistBookmarkCreate(&ql, "_AD", node)) {
if (!quicklist_bm_create(&ql, "_AD", node)) {
bookmark_failed = 1;
} else {
ob->ptr = ql; /* bookmark creation may have re-allocated the quicklist */
Expand All @@ -367,9 +364,9 @@ long scanLaterList(robj *ob, unsigned long *cursor, long long endtime) {
}
iterations = 0;
}
node = node->next;
node = quicklist_next_for_bookmark(ql, node);
}
quicklistBookmarkDelete(ql, "_AD");
quicklist_bm_delete(ql, "_AD");
*cursor = 0;
return bookmark_failed? 1: 0;
}
Expand Down Expand Up @@ -427,11 +424,11 @@ void scanLaterHash(robj *ob, unsigned long *cursor) {

void defragQuicklist(redisDb *db, dictEntry *kde) {
robj *ob = dictGetVal(kde);
quicklist *ql = ob->ptr, *newql;
struct quicklist *ql = ob->ptr, *newql;
serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST);
if ((newql = activeDefragAlloc(ql)))
ob->ptr = ql = newql;
if (ql->len > server.active_defrag_max_scan_fields)
if ((unsigned long)quicklist_node_count(ql) > server.active_defrag_max_scan_fields)
defragLater(db, kde);
else
activeDefragQuickListNodes(ql);
Expand Down
4 changes: 2 additions & 2 deletions src/lazyfree.c
Expand Up @@ -108,8 +108,8 @@ void lazyfreeResetStats(void) {
* representing the list. */
size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) {
if (obj->type == OBJ_LIST && obj->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = obj->ptr;
return ql->len;
struct quicklist *ql = obj->ptr;
return quicklist_node_count(ql);
} else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
dict *ht = obj->ptr;
return dictSize(ht);
Expand Down
29 changes: 18 additions & 11 deletions src/module.c
Expand Up @@ -197,7 +197,6 @@ struct RedisModuleKey {
union {
struct {
/* List, use only if value->type == OBJ_LIST */
listTypeEntry entry; /* Current entry in iteration. */
long index; /* Current 0-based index in iteration. */
} list;
struct {
Expand Down Expand Up @@ -4438,7 +4437,7 @@ int moduleListIteratorSeek(RedisModuleKey *key, long index, int mode) {
/* No existing iterator. Create one. */
key->iter = listTypeInitIterator(key->value, index, LIST_TAIL);
serverAssert(key->iter != NULL);
serverAssert(listTypeNext(key->iter, &key->u.list.entry));
serverAssert(listTypeNext(key->iter));
key->u.list.index = index;
return 1;
}
Expand All @@ -4452,9 +4451,17 @@ int moduleListIteratorSeek(RedisModuleKey *key, long index, int mode) {

/* Seek the iterator to the requested index. */
unsigned char dir = key->u.list.index < index ? LIST_TAIL : LIST_HEAD;
listTypeSetIteratorDirection(key->iter, &key->u.list.entry, dir);
listTypeIterator *li = key->iter;
if (dir != li->direction) {
listTypeReleaseIterator(li);
key->iter = listTypeInitIterator(key->value, index, dir);
serverAssert(key->iter != NULL);
serverAssert(listTypeNext(key->iter));
key->u.list.index = index;
return 1;
}
while (key->u.list.index != index) {
serverAssert(listTypeNext(key->iter, &key->u.list.entry));
serverAssert(listTypeNext(key->iter));
key->u.list.index += dir == LIST_HEAD ? -1 : 1;
}
return 1;
Expand Down Expand Up @@ -4546,7 +4553,7 @@ RedisModuleString *RM_ListPop(RedisModuleKey *key, int where) {
*/
RedisModuleString *RM_ListGet(RedisModuleKey *key, long index) {
if (moduleListIteratorSeek(key, index, REDISMODULE_READ)) {
robj *elem = listTypeGet(&key->u.list.entry);
robj *elem = listTypeGet(key->iter);
robj *decoded = getDecodedObject(elem);
decrRefCount(elem);
autoMemoryAdd(key->ctx, REDISMODULE_AM_STRING, decoded);
Expand Down Expand Up @@ -4582,7 +4589,7 @@ int RM_ListSet(RedisModuleKey *key, long index, RedisModuleString *value) {
}
listTypeTryConversionAppend(key->value, &value, 0, 0, moduleFreeListIterator, key);
if (moduleListIteratorSeek(key, index, REDISMODULE_WRITE)) {
listTypeReplace(&key->u.list.entry, value);
listTypeReplace(key->iter, value);
/* A note in quicklist.c forbids use of iterator after insert, so
* probably also after replace. */
moduleFreeKeyIterator(key);
Expand Down Expand Up @@ -4629,7 +4636,7 @@ int RM_ListInsert(RedisModuleKey *key, long index, RedisModuleString *value) {
listTypeTryConversionAppend(key->value, &value, 0, 0, moduleFreeListIterator, key);
if (moduleListIteratorSeek(key, index, REDISMODULE_WRITE)) {
int where = index < 0 ? LIST_TAIL : LIST_HEAD;
listTypeInsert(&key->u.list.entry, value, where);
listTypeInsert(key->iter, value, where);
/* A note in quicklist.c forbids use of iterator after insert. */
moduleFreeKeyIterator(key);
return REDISMODULE_OK;
Expand All @@ -4651,19 +4658,19 @@ int RM_ListInsert(RedisModuleKey *key, long index, RedisModuleString *value) {
*/
int RM_ListDelete(RedisModuleKey *key, long index) {
if (moduleListIteratorSeek(key, index, REDISMODULE_WRITE)) {
listTypeDelete(key->iter, &key->u.list.entry);
listTypeDelete(key->iter);
if (moduleDelKeyIfEmpty(key)) return REDISMODULE_OK;
listTypeTryConversion(key->value, LIST_CONV_SHRINKING, moduleFreeListIterator, key);
if (!key->iter) return REDISMODULE_OK; /* Return ASAP if iterator has been freed */
if (listTypeNext(key->iter, &key->u.list.entry)) {
listTypeIterator *iter = key->iter;
if (listTypeNext(iter)) {
/* After delete entry at position 'index', we need to update
* 'key->u.list.index' according to the following cases:
* 1) [1, 2, 3] => dir: forward, index: 0 => [2, 3] => index: still 0
* 2) [1, 2, 3] => dir: forward, index: -3 => [2, 3] => index: -2
* 3) [1, 2, 3] => dir: reverse, index: 2 => [1, 2] => index: 1
* 4) [1, 2, 3] => dir: reverse, index: -1 => [1, 2] => index: still -1 */
listTypeIterator *li = key->iter;
int reverse = li->direction == LIST_HEAD;
int reverse = iter->direction == LIST_HEAD;
if (key->u.list.index < 0)
key->u.list.index += reverse ? 0 : 1;
else
Expand Down
37 changes: 21 additions & 16 deletions src/object.c
Expand Up @@ -233,7 +233,7 @@ robj *dupStringObject(const robj *o) {
}

robj *createQuicklistObject(void) {
quicklist *l = quicklistCreate();
struct quicklist *l = quicklist_new(server.list_max_listpack_size, server.list_compress_depth);
robj *o = createObject(OBJ_LIST,l);
o->encoding = OBJ_ENCODING_QUICKLIST;
return o;
Expand Down Expand Up @@ -314,7 +314,7 @@ void freeStringObject(robj *o) {

void freeListObject(robj *o) {
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklistRelease(o->ptr);
quicklist_free(o->ptr);
} else if (o->encoding == OBJ_ENCODING_LISTPACK) {
lpFree(o->ptr);
} else {
Expand Down Expand Up @@ -423,19 +423,21 @@ void dismissStringObject(robj *o) {
/* See dismissObject() */
void dismissListObject(robj *o, size_t size_hint) {
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = o->ptr;
serverAssert(ql->len != 0);
struct quicklist *ql = o->ptr;
serverAssert(quicklist_node_count(ql) != 0);
/* We iterate all nodes only when average node size is bigger than a
* page size, and there's a high chance we'll actually dismiss something. */
if (size_hint / ql->len >= server.page_size) {
quicklistNode *node = ql->head;
if (size_hint / quicklist_node_count(ql) >= server.page_size) {
struct quicklist_partition *p;
struct quicklist_node *node;
quicklist_first_node(ql, &p, &node);
while (node) {
if (quicklistNodeIsCompressed(node)) {
dismissMemory(node->entry, ((quicklistLZF*)node->entry)->sz);
if (!node->raw) {
dismissMemory(node->carry, ((struct quicklist_lzf *)node->carry)->sz);
} else {
dismissMemory(node->entry, node->sz);
dismissMemory(node->carry, node->raw_sz);
}
node = node->next;
quicklist_next(p, node, &p, &node);
}
}
} else if (o->encoding == OBJ_ENCODING_LISTPACK) {
Expand Down Expand Up @@ -1018,14 +1020,17 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
}
} else if (o->type == OBJ_LIST) {
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = o->ptr;
quicklistNode *node = ql->head;
asize = sizeof(*o)+sizeof(quicklist);
struct quicklist *ql = o->ptr;
struct quicklist_partition *p;
struct quicklist_node *node;
quicklist_first_node(ql, &p, &node);
asize = sizeof(*o)+sizeof(struct quicklist);
do {
elesize += sizeof(quicklistNode)+zmalloc_size(node->entry);
elesize += sizeof(struct quicklist_node)+zmalloc_size(node->carry);
samples++;
} while ((node = node->next) && samples < sample_size);
asize += (double)elesize/samples*ql->len;
quicklist_next(p, node, &p, &node);
} while (node && samples < sample_size);
asize += (double)elesize/samples*quicklist_node_count(ql);
} else if (o->encoding == OBJ_ENCODING_LISTPACK) {
asize = sizeof(*o)+zmalloc_size(o->ptr);
} else {
Expand Down

0 comments on commit fa2a707

Please sign in to comment.