Skip to content

Commit

Permalink
chore: improve xadd performance and remove redundant allocations (#1160)
Browse files Browse the repository at this point in the history
1. Incorporate StreamAppendItem into c++ codebase and stop using t_stream implementation.
2. Change its signature to accept CmdArgList instead of array of robj*.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange committed May 3, 2023
1 parent fa61b63 commit 842c1e4
Showing 1 changed file with 328 additions and 11 deletions.
339 changes: 328 additions & 11 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ const char kXGroupKeyNotFound[] =
"Note that for CREATE you may want to use the MKSTREAM option to create "
"an empty stream automatically.";

const uint32_t STREAM_LISTPACK_MAX_SIZE = 1 << 30;
const uint32_t kStreamNodeMaxBytes = 4096;
const uint32_t kStreamNodeMaxEntries = 100;
const uint32_t STREAM_LISTPACK_MAX_PRE_ALLOCATE = 4096;

/* Every stream item inside the listpack, has a flags field that is used to
* mark the entry as deleted, or having the same field as the "master"
* entry at the start of the listpack. */
const uint32_t STREAM_ITEM_FLAG_NONE = 0; /* No special flags. */
const uint32_t STREAM_ITEM_FLAG_DELETED = (1 << 0); /* Entry is deleted. Skip it. */
const uint32_t STREAM_ITEM_FLAG_SAMEFIELDS = (1 << 1); /* Same fields as master entry. */

inline string StreamIdRepr(const streamID& id) {
return absl::StrCat(id.ms, "-", id.seq);
};
Expand Down Expand Up @@ -138,6 +150,321 @@ bool ParseRangeId(string_view id, RangeId* dest) {
return ParseID(id, dest->exclude, 0, &dest->parsed_id);
}

/* This is a wrapper function for lpGet() to directly get an integer value
* from the listpack (that may store numbers as a string), converting
* the string if needed.
* The `valid` argument is an optional output parameter to get an indication
* if the record was valid, when this parameter is NULL, the function will
* fail with an assertion. */
static inline int64_t lpGetIntegerIfValid(unsigned char* ele, int* valid) {
int64_t v;
unsigned char* e = lpGet(ele, &v, NULL);
if (e == NULL) {
if (valid)
*valid = 1;
return v;
}
/* The following code path should never be used for how listpacks work:
* they should always be able to store an int64_t value in integer
* encoded form. However the implementation may change. */
long long ll;
int ret = string2ll((char*)e, v, &ll);
if (valid)
*valid = ret;
else
serverAssert(ret != 0);
v = ll;
return v;
}

int64_t lpGetInteger(unsigned char* ele) {
return lpGetIntegerIfValid(ele, NULL);
}

/* Generate the next stream item ID given the previous one. If the current
* milliseconds Unix time is greater than the previous one, just use this
* as time part and start with sequence part of zero. Otherwise we use the
* previous time (and never go backward) and increment the sequence. */
void StreamNextID(const streamID* last_id, streamID* new_id) {
uint64_t ms = mstime();
if (ms > last_id->ms) {
new_id->ms = ms;
new_id->seq = 0;
} else {
*new_id = *last_id;
streamIncrID(new_id);
}
}

/* Convert the specified stream entry ID as a 128 bit big endian number, so
* that the IDs can be sorted lexicographically. */
inline void StreamEncodeID(uint8_t* buf, streamID* id) {
absl::big_endian::Store64(buf, id->ms);
absl::big_endian::Store64(buf + 8, id->seq);
}

/* Adds a new item into the stream 's' having the specified number of
* field-value pairs as specified in 'numfields' and stored into 'argv'.
* Returns the new entry ID populating the 'added_id' structure.
*
* If 'use_id' is not NULL, the ID is not auto-generated by the function,
* but instead the passed ID is used to add the new entry. In this case
* adding the entry may fail as specified later in this comment.
*
* When 'use_id' is used alongside with a zero 'seq-given', the sequence
* part of the passed ID is ignored and the function will attempt to use an
* auto-generated sequence.
*
* The function returns C_OK if the item was added, this is always true
* if the ID was generated by the function. However the function may return
* C_ERR in several cases:
* 1. If an ID was given via 'use_id', but adding it failed since the
* current top ID is greater or equal. errno will be set to EDOM.
* 2. If a size of a single element or the sum of the elements is too big to
* be stored into the stream. errno will be set to ERANGE. */
int StreamAppendItem(stream* s, CmdArgList fields, streamID* added_id, streamID* use_id,
int seq_given) {
/* Generate the new entry ID. */
streamID id;
if (use_id) {
if (seq_given) {
id = *use_id;
} else {
/* The automatically generated sequence can be either zero (new
* timestamps) or the incremented sequence of the last ID. In the
* latter case, we need to prevent an overflow/advancing forward
* in time. */
if (s->last_id.ms == use_id->ms) {
if (s->last_id.seq == UINT64_MAX) {
return C_ERR;
}
id = s->last_id;
id.seq++;
} else {
id = *use_id;
}
}
} else {
StreamNextID(&s->last_id, &id);
}

/* Check that the new ID is greater than the last entry ID
* or return an error. Automatically generated IDs might
* overflow (and wrap-around) when incrementing the sequence
part. */
if (streamCompareID(&id, &s->last_id) <= 0) {
errno = EDOM;
return C_ERR;
}

/* Avoid overflow when trying to add an element to the stream (listpack
* can only host up to 32bit length sttrings, and also a total listpack size
* can't be bigger than 32bit length. */
size_t totelelen = 0;
for (size_t i = 0; i < fields.size(); i++) {
totelelen += fields[i].size();
}

if (totelelen > STREAM_LISTPACK_MAX_SIZE) {
errno = ERANGE;
return C_ERR;
}

/* Add the new entry. */
raxIterator ri;
raxStart(&ri, s->rax_tree);
raxSeek(&ri, "$", NULL, 0);

size_t lp_bytes = 0; /* Total bytes in the tail listpack. */
unsigned char* lp = NULL; /* Tail listpack pointer. */

if (!raxEOF(&ri)) {
/* Get a reference to the tail node listpack. */
lp = (uint8_t*)ri.data;
lp_bytes = lpBytes(lp);
}
raxStop(&ri);

/* We have to add the key into the radix tree in lexicographic order,
* to do so we consider the ID as a single 128 bit number written in
* big endian, so that the most significant bytes are the first ones. */
uint8_t rax_key[16]; /* Key in the radix tree containing the listpack.*/
streamID master_id; /* ID of the master entry in the listpack. */

/* Create a new listpack and radix tree node if needed. Note that when
* a new listpack is created, we populate it with a "master entry". This
* is just a set of fields that is taken as references in order to compress
* the stream entries that we'll add inside the listpack.
*
* Note that while we use the first added entry fields to create
* the master entry, the first added entry is NOT represented in the master
* entry, which is a stand alone object. But of course, the first entry
* will compress well because it's used as reference.
*
* The master entry is composed like in the following example:
*
* +-------+---------+------------+---------+--/--+---------+---------+-+
* | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0|
* +-------+---------+------------+---------+--/--+---------+---------+-+
*
* count and deleted just represent respectively the total number of
* entries inside the listpack that are valid, and marked as deleted
* (deleted flag in the entry flags set). So the total number of items
* actually inside the listpack (both deleted and not) is count+deleted.
*
* The real entries will be encoded with an ID that is just the
* millisecond and sequence difference compared to the key stored at
* the radix tree node containing the listpack (delta encoding), and
* if the fields of the entry are the same as the master entry fields, the
* entry flags will specify this fact and the entry fields and number
* of fields will be omitted (see later in the code of this function).
*
* The "0" entry at the end is the same as the 'lp-count' entry in the
* regular stream entries (see below), and marks the fact that there are
* no more entries, when we scan the stream from right to left. */

/* First of all, check if we can append to the current macro node or
* if we need to switch to the next one. 'lp' will be set to NULL if
* the current node is full. */
if (lp != NULL) {
size_t node_max_bytes = kStreamNodeMaxBytes;
if (node_max_bytes == 0 || node_max_bytes > STREAM_LISTPACK_MAX_SIZE)
node_max_bytes = STREAM_LISTPACK_MAX_SIZE;
if (lp_bytes + totelelen >= node_max_bytes) {
lp = NULL;
} else if (kStreamNodeMaxEntries) {
unsigned char* lp_ele = lpFirst(lp);
/* Count both live entries and deleted ones. */
int64_t count = lpGetInteger(lp_ele) + lpGetInteger(lpNext(lp, lp_ele));
if (count >= kStreamNodeMaxEntries) {
/* Shrink extra pre-allocated memory */
lp = lpShrinkToFit(lp);
if (ri.data != lp)
raxInsert(s->rax_tree, ri.key, ri.key_len, lp, NULL);
lp = NULL;
}
}
}

int flags = 0;
unsigned numfields = fields.size() / 2;
if (lp == NULL) {
master_id = id;
StreamEncodeID(rax_key, &id);
/* Create the listpack having the master entry ID and fields.
* Pre-allocate some bytes when creating listpack to avoid realloc on
* every XADD. Since listpack.c uses malloc_size, it'll grow in steps,
* and won't realloc on every XADD.
* When listpack reaches max number of entries, we'll shrink the
* allocation to fit the data. */
size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE;

lp = lpNew(prealloc);
lp = lpAppendInteger(lp, 1); /* One item, the one we are adding. */
lp = lpAppendInteger(lp, 0); /* Zero deleted so far. */
lp = lpAppendInteger(lp, numfields);
for (int64_t i = 0; i < numfields; i++) {
MutableSlice field = fields[i * 2];
lp = lpAppend(lp, reinterpret_cast<const uint8_t*>(field.data()), field.size());
}
lp = lpAppendInteger(lp, 0); /* Master entry zero terminator. */
raxInsert(s->rax_tree, (unsigned char*)&rax_key, sizeof(rax_key), lp, NULL);
/* The first entry we insert, has obviously the same fields of the
* master entry. */
flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
} else {
serverAssert(ri.key_len == sizeof(rax_key));
memcpy(rax_key, ri.key, sizeof(rax_key));

/* Read the master ID from the radix tree key. */
streamDecodeID(rax_key, &master_id);
unsigned char* lp_ele = lpFirst(lp);

/* Update count and skip the deleted fields. */
int64_t count = lpGetInteger(lp_ele);
lp = lpReplaceInteger(lp, &lp_ele, count + 1);
lp_ele = lpNext(lp, lp_ele); /* seek deleted. */
lp_ele = lpNext(lp, lp_ele); /* seek master entry num fields. */

/* Check if the entry we are adding, have the same fields
* as the master entry. */
int64_t master_fields_count = lpGetInteger(lp_ele);
lp_ele = lpNext(lp, lp_ele);
if (numfields == master_fields_count) {
int64_t i;
for (i = 0; i < master_fields_count; i++) {
MutableSlice field = fields[i * 2];
int64_t e_len;
unsigned char buf[LP_INTBUF_SIZE];
unsigned char* e = lpGet(lp_ele, &e_len, buf);
/* Stop if there is a mismatch. */
if (field.size() != (size_t)e_len || memcmp(e, field.data(), e_len) != 0)
break;
lp_ele = lpNext(lp, lp_ele);
}
/* All fields are the same! We can compress the field names
* setting a single bit in the flags. */
if (i == master_fields_count)
flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
}
}

/* Populate the listpack with the new entry. We use the following
* encoding:
*
* +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
* |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count|
* +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
*
* However if the SAMEFIELD flag is set, we have just to populate
* the entry with the values, so it becomes:
*
* +-----+--------+-------+-/-+-------+--------+
* |flags|entry-id|value-1|...|value-N|lp-count|
* +-----+--------+-------+-/-+-------+--------+
*
* The entry-id field is actually two separated fields: the ms
* and seq difference compared to the master entry.
*
* The lp-count field is a number that states the number of listpack pieces
* that compose the entry, so that it's possible to travel the entry
* in reverse order: we can just start from the end of the listpack, read
* the entry, and jump back N times to seek the "flags" field to read
* the stream full entry. */
lp = lpAppendInteger(lp, flags);
lp = lpAppendInteger(lp, id.ms - master_id.ms);
lp = lpAppendInteger(lp, id.seq - master_id.seq);
if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
lp = lpAppendInteger(lp, numfields);
for (int64_t i = 0; i < numfields; i++) {
MutableSlice field = fields[i * 2], value = fields[i * 2 + 1];
if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
lp = lpAppend(lp, reinterpret_cast<const uint8_t*>(field.data()), field.size());
lp = lpAppend(lp, reinterpret_cast<const uint8_t*>(value.data()), value.size());
}
/* Compute and store the lp-count field. */
int64_t lp_count = numfields;
lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */
if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
/* If the item is not compressed, it also has the fields other than
* the values, and an additional num-fields field. */
lp_count += numfields + 1;
}
lp = lpAppendInteger(lp, lp_count);

/* Insert back into the tree in order to update the listpack pointer. */
if (ri.data != lp)
raxInsert(s->rax_tree, (unsigned char*)&rax_key, sizeof(rax_key), lp, NULL);
s->length++;
s->entries_added++;
s->last_id = id;
if (s->length == 1)
s->first_id = id;
if (added_id)
*added_id = id;
return C_OK;
}

OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts& opts,
CmdArgList args) {
DCHECK(!args.empty() && args.size() % 2 == 0);
Expand Down Expand Up @@ -166,22 +493,12 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts&

stream* stream_inst = (stream*)it->second.RObjPtr();

// we should really get rid of this monstrousity and rewrite streamAppendItem ourselves here.
unique_ptr<robj*[]> objs(new robj*[args.size()]);
for (size_t i = 0; i < args.size(); ++i) {
objs[i] = createStringObject(args[i].data(), args[i].size());
}

streamID result_id;
const auto& parsed_id = opts.parsed_id;
streamID passed_id = parsed_id.val;
int res = streamAppendItem(stream_inst, objs.get(), args.size() / 2, &result_id,
int res = StreamAppendItem(stream_inst, args, &result_id,
parsed_id.id_given ? &passed_id : nullptr, parsed_id.has_seq);

for (size_t i = 0; i < args.size(); ++i) {
decrRefCount(objs[i]);
}

if (res != C_OK) {
if (errno == ERANGE)
return OpStatus::OUT_OF_RANGE;
Expand Down

0 comments on commit 842c1e4

Please sign in to comment.