You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
rax *cgroups; /* 一棵字典树,Key 是 group_name,Value 是 group 结构的指针 */uint64_t length; /* Current number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
streamID first_id; /* The first non-tombstone entry, zero if empty. */
streamID max_deleted_entry_id; /* The maximal ID that was deleted. */uint64_t entries_added; /* All time count of elements added. */
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
From issue #1584
Draft PR #1673
方案原文链接:https://t8dj523v5p.feishu.cn/wiki/YRmJwKI5oiPYtLkiQfcczmd2ndd
当前实现进度/待办
实现
测试
pika 实现方案
阶段一:实现基本的数据格式
可以参照 blackwidow 的实现,但是底层不用 rocksdb,而是选取现有的五种数据结构实现:set,zset,list,hash,string。但抽象其实还是和这五类数据结构一样的,将需要存储的数据分为两个部分:
我们需要先捋清楚数据存储的格式以及存储的需求,再选取合适的数据结构。
先看结论:
stream data(message)
存储格式:类似于顺序存储的键值对,键是 ID,值是消息内容
Key:id (uint64 * 2) Value:message(string)
存储需求:表现上是一个队列,但整体功能来看更像是一个有序 kv 结构,最核心的功能有两个:
结构选取:
为什么其它四种类型不可用?
那为什么可以用 hash 实现?
消息的存储方式:
hash filed 用于存储 id,value 用于存储 message
hash 的 key 直接使用 stream 的 key(使用时需要避免 hash 和 stream 的key冲突!)
增加消息:直接用
HSet()
实现,只要保证 id 递增,就可以当作是往队列末尾增加消息(其实由于它们 key 值连续,最后在rocksdb中同一层的实际存储按道理也是按序的?)。读取消息:用
PKHScanRange()
,只要把 pattern 设置为"*"
,就可以当作范围扫描,从而实现 “返回特定 id 之后的 n 条消息” 这一需求,stream meta(stream, consumer_group, consumer, pel)
存储格式:由于 Stream 中有消费组的存在,导致它的依赖关系是树形的,一层套一层。比如一个 Stream 里面有多个 cgroup ,每个 cgroup 里面又有多个 consumer 。其中 consumer 和 cgroup 都有自己的元数据。
这让 Stream 的元数据不能简单地设置成下面这种固定长度的形式:
拿 stream 的元数据举例:
其中 cgroups 是指向一棵字典树的指针,树内存储的是
group_name : group_meta_data
这样的键值对。但 pika 的基本数据结构中,没有字典树的结构,更不能用指针指向这些树。
现在看来,我们可以将上述的问题拆分成下面两个问题:
只要解决这两个问题,元数据的形式直接和 redis 一致就可以了。
对上述两个问题的解决方案:
最后,在上述方案的基础上,使用 hash 设计出所有的元数据结构,元数据的内容参照 redis stream。
Key: tree_id(4B) + message_id(streamID) Value: mstime_t delivery_time uint64_t delivery_count treeID consum
具体需要为上述元数据抽象出如下 class:
每个元数据对应两种个类,分别用于新建元数据和读取解析现有的元数据
除了上述数据类型设计,还需要考虑 TreeID 的生成
TreeID 的生成其实就是返回一个不断递增的 uint32_t 整形,只需要持久化该整形即可,可以强行用 hash 实现,在 hash 里面只存一个字段就行。
所以 TreeID 是存储在用于存储 StreamMeta 的 hash 中,该 hash 的 key 统一为 “STREAM”
redis stream 实现原理(附录)
宏观结构
宏观上来看所谓消息队列其实就是一个 List,链头通过插入的方式发布消息,以向前遍历的方式进行消费。
通常来说,消息队列除了队列的性质,还需要支持对消息的快速查找,所以一般的实现方式都是一个类似于跳表的形式加上索引结构。前者用于保持队列的性质,后者用于低时间复杂度的查找。
数据结构:radix tree
redis stream 中,针对每一条消息,都必须设置唯一且递增的消息 ID。
这种一段时间内的连续 ID,前缀都有一些高度重复性,所以用这类前缀树可以有效节约空间使用率。
除了用于存储消息,同 hash_map 的使用方式类似,它还被用来存储以下关系。
数据结构:listpack
*A lists of strings serialization format:*一个字符串列表的序列化格式,也就是将一个字符串列表进行序列化存储。
简单的理解 listpack 就是一款专门为
节省内存空间
,通过特定的编码方式将数据进行编码和解码的数据结构,这种结构天生就是为节省空间而存在的。总的来看,每个 stream 都用对应一个 radix tree,value 则存储了一个指向 listpack 的指针,同一个 listpack 可能存储多个消息的 value。
新的消息条目会被添加到Listpack中。如果添加新条目后的Listpack大小超过了设定的限制,那么会从Radix Tree中分裂出一个新的Listpack来存储超出的数据。
多播支持
实际上就是对消费组的支持,主要需要注意以下细节:
持久化方案
理论上需要将 stream 和其用到的数据结构对 RDB 和 AOF 做支持。
RDB 支持:
参考博客
Beta Was this translation helpful? Give feedback.
All reactions