fix(stream): correct subkey prefix encoding in DestroyGroup#3420
Conversation
There was a problem hiding this comment.
I realized that we don't need to track delete_cnt. We can just use DeleteRange directly without iterating.
Please take a look at cmd_stream.cc L575. We only care whether delete_cnt is zero or not, rather than its specific value.
Can we avoid global scans for consumer metadata and PEL?
For instance, we could perform a prefix scan for each. By reusing the encoding prefixes defined in redis_stream.cc (line 221) and redis_stream.cc (line 266) respectively.
| PutFixed64(&sub_key_prefix, group_name.size()); | ||
| sub_key_prefix += group_name; | ||
|
|
||
| std::string next_version_prefix_key = |
There was a problem hiding this comment.
Isn't this upper bound too large?
There was a problem hiding this comment.
Yes, the upper bound in the original implementation is too large, but I have a filter in the iterator loop to make sure only delete the needed data.
And now, I have changed to use the same version's end_key instead of the next version with filter. thanks for your suggestion.
4bf5080 to
b88e056
Compare
Good idea, fixed with DeleteRange, the code is much simple now |
8b51583 to
1ecf84f
Compare
| range_options.with_count = true; | ||
| range_options.exclude_start = true; | ||
| std::vector<redis::StreamEntry> entries; | ||
| std::string c1 = "c1", c2 = "c2"; |
There was a problem hiding this comment.
Could we split this into two declaration?
| s = stream_->DestroyGroup(*ctx_, stream_name, group_name, &delete_cnt); | ||
| EXPECT_TRUE(delete_cnt == 0); | ||
| bool destroyed = false; | ||
| s = stream_->DestroyGroup(*ctx_, stream_name, group_name, &destroyed); |
There was a problem hiding this comment.
Could we add GetGroupInfo before and after destroy?
There was a problem hiding this comment.
Pull request overview
This PR fixes Stream::DestroyGroup so it scans/deletes the correct internal subkey prefixes for stream consumer group data (group metadata, consumer metadata, and PEL entries), preventing missed deletions or accidental deletion of unrelated stream entry keys during XGROUP DESTROY.
Changes:
- Correct
DestroyGroupsubkey prefix construction by including theUINT64_MAXdelimiter andStreamSubkeyType, and delete viaDeleteRangefor each relevant subkey type. - Change
DestroyGroupAPI to return a booleandestroyed(group existed and was destroyed) instead of a deleted-key count, and updateXGROUP DESTROYoutput accordingly. - Add/extend tests to verify
DestroyGroupcleans up consumers/PEL entries and does not affect other groups.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| tests/cppunit/types/stream_test.cc | Updates existing destroy test to use destroyed bool; adds new coverage for cleanup and multi-group isolation. |
| src/types/redis_stream.h | Updates DestroyGroup signature to return bool *destroyed. |
| src/types/redis_stream.cc | Fixes subkey prefix encoding and uses DeleteRange per subkey type; updates metadata accordingly. |
| src/commands/cmd_stream.cc | Updates XGROUP DESTROY command implementation to use destroyed and return integer 1/0. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
13fe3fd to
97d31a9
Compare
|
Sorry, I looked at the wrong PR. I didn't mean to close it. |
|
|
@songqing Thank you for your hard work. |
) DestroyGroup was constructing scan prefixes using only PutFixed64(group_name.size()) + group_name, which does not match the actual internal key format used by [internalKeyFromGroupName](https://github.com/apache/kvrocks/blob/da5e46307047b3e89753f0c9c02996430e758a5f/src/types/redis_stream.cc#L171), internalKeyFromConsumerName, and internalPelKeyFromGroupAndEntryId. The real keys are prefixed with PutFixed64(UINT64_MAX) + PutFixed8(type) before the group name length and name. The wrong prefix meant the scan range could miss all group/consumer/PEL keys or match unrelated stream entry keys, causing data leaks or corruption on XGROUP DESTROY. Fix by iterating over the three subkey types (GroupMetadata, ConsumerMetadata, PelEntry) and building the correct prefix for each, consistent with the internal key encoding helpers. Besides, add two test cases verifying that DestroyGroup correctly cleans up all group-related subkeys (group metadata, consumer metadata, PEL entries): - DestroyGroupCleansUpConsumersAndPelEntries: verifies full cleanup including that stream entries are preserved and the group can be re-created cleanly. - DestroyGroupDoesNotAffectOtherGroups: verifies that destroying one group does not affect another group's consumers or PEL entries.



DestroyGroup was constructing scan prefixes using only
PutFixed64(group_name.size()) + group_name, which does not match the
actual internal key format used by internalKeyFromGroupName,
internalKeyFromConsumerName, and internalPelKeyFromGroupAndEntryId.
The real keys are prefixed with PutFixed64(UINT64_MAX) + PutFixed8(type)
before the group name length and name. The wrong prefix meant the scan
range could miss all group/consumer/PEL keys or match unrelated stream
entry keys, causing data leaks or corruption on XGROUP DESTROY.
Fix by iterating over the three subkey types (GroupMetadata,
ConsumerMetadata, PelEntry) and building the correct prefix for each,
consistent with the internal key encoding helpers.
Besides, add two test cases verifying that DestroyGroup correctly cleans up all
group-related subkeys (group metadata, consumer metadata, PEL entries):
including that stream entries are preserved and the group can be
re-created cleanly.
group does not affect another group's consumers or PEL entries.