From 79285ae6fb1297c52672e38717fb3a9019d48ec9 Mon Sep 17 00:00:00 2001 From: Daniel M Date: Sun, 18 Jun 2023 22:54:58 -0400 Subject: [PATCH] feat[stream]:implement `XAUTOCLAIM` fix #158 --- docs/about/changelog.md | 2 +- docs/redis-commands/Redis.md | 11 +++--- fakeredis/_stream.py | 29 ++++++++++++--- fakeredis/commands_mixins/streams_mixin.py | 26 +++++++++++++- test/test_mixins/test_streams_commands.py | 42 +++++++++++++++++----- 5 files changed, 87 insertions(+), 23 deletions(-) diff --git a/docs/about/changelog.md b/docs/about/changelog.md index a76529e9..2275902f 100644 --- a/docs/about/changelog.md +++ b/docs/about/changelog.md @@ -10,7 +10,7 @@ description: Change log of all fakeredis releases - Implemented support for various stream groups commands: - `XGROUP CREATE` #161, `XGROUP DESTROY` #164, `XGROUP SETID` #165, `XGROUP DELCONSUMER` #162, `XGROUP CREATECONSUMER` #163, `XINFO GROUPS` #168, `XINFO CONSUMERS` #168, `XINFO STREAM` #169, `XREADGROUP` #171, - `XACK` #157, `XPENDING` #170, `XCLAIM` #159 + `XACK` #157, `XPENDING` #170, `XCLAIM` #159, `XAUTOCLAIM` #158 - Implemented sorted set commands: - `ZRANDMEMBER` #192, `ZDIFF` #187, `ZINTER` #189, `ZUNION` #194, `ZDIFFSTORE` #188, `ZINTERCARD` #190, `ZRANGESTORE` #193 diff --git a/docs/redis-commands/Redis.md b/docs/redis-commands/Redis.md index 7e2709a3..d8759385 100644 --- a/docs/redis-commands/Redis.md +++ b/docs/redis-commands/Redis.md @@ -1442,6 +1442,10 @@ Returns the number of messages that were successfully acknowledged by the consum Appends a new message to a stream. Creates the key if it doesn't exist. +### [XAUTOCLAIM](https://redis.io/commands/xautoclaim/) + +Changes, or acquires, ownership of messages in a consumer group, as if the messages were delivered to as consumer group member. + ### [XCLAIM](https://redis.io/commands/xclaim/) Changes, or acquires, ownership of a message in a consumer group, as if the message was delivered a consumer group member. @@ -1511,11 +1515,4 @@ Returns the messages from a stream within a range of IDs in reverse order. Deletes messages from the beginning of a stream. -### Unsupported stream commands -> To implement support for a command, see [here](../../guides/implement-command/) - -#### [XAUTOCLAIM](https://redis.io/commands/xautoclaim/) (not implemented) - -Changes, or acquires, ownership of messages in a consumer group, as if the messages were delivered to as consumer group member. - diff --git a/fakeredis/_stream.py b/fakeredis/_stream.py index b2fa5a5d..e52503d1 100644 --- a/fakeredis/_stream.py +++ b/fakeredis/_stream.py @@ -211,16 +211,20 @@ def pending_summary(self) -> List[bytes]: ] return data - def claim(self, min_idle_ms: int, msgs: List[bytes], consumer_name: bytes, _time: Optional[int], - force: bool) -> List: + def claim(self, + min_idle_ms: int, + msgs: List[bytes], + consumer_name: bytes, + _time: Optional[int], + force: bool) -> Tuple[List, List]: curr_time = current_time() if _time is None: _time = curr_time self.consumers.get(consumer_name, StreamConsumerInfo(consumer_name)).last_attempt = curr_time - claimed_msgs = [] + claimed_msgs, deleted_msgs = [], [] for msg in msgs: try: - key = StreamEntryKey.parse_str(msg) + key = StreamEntryKey.parse_str(msg) if isinstance(msg, bytes) else msg except Exception: continue if key not in self.pel: @@ -228,14 +232,29 @@ def claim(self, min_idle_ms: int, msgs: List[bytes], consumer_name: bytes, _time self.pel[key] = (consumer_name, _time) # Force claim msg if key in self.stream: claimed_msgs.append(key) + else: + deleted_msgs.append(key) + del self.pel[key] continue if curr_time - self.pel[key][1] < min_idle_ms: continue # Not idle enough time to be claimed self.pel[key] = (consumer_name, _time) if key in self.stream: claimed_msgs.append(key) + else: + deleted_msgs.append(key) + del self.pel[key] self._calc_consumer_last_time() - return claimed_msgs + return sorted(claimed_msgs), sorted(deleted_msgs) + + def read_pel_msgs(self, min_idle_ms: int, start: bytes, count: int): + start_key = StreamEntryKey.parse_str(start) + curr_time = current_time() + msgs = sorted([k for k in self.pel + if (curr_time - self.pel[k][1] >= min_idle_ms) + and k >= start_key]) + count = min(count, len(msgs)) + return msgs[:count] class XStream: diff --git a/fakeredis/commands_mixins/streams_mixin.py b/fakeredis/commands_mixins/streams_mixin.py index 115c41db..9c974d35 100644 --- a/fakeredis/commands_mixins/streams_mixin.py +++ b/fakeredis/commands_mixins/streams_mixin.py @@ -276,7 +276,31 @@ def xclaim(self, key, group_name, consumer_name, min_idle_ms, *args): if idle is not None and idle > 0 and _time is None: _time = current_time() - idle - msgs_claimed = sorted(group.claim(min_idle_ms, msg_ids, consumer_name, _time, force)) + msgs_claimed, _ = group.claim(min_idle_ms, msg_ids, consumer_name, _time, force) + if justid: return [msg.encode() for msg in msgs_claimed] return [stream.format_record(msg) for msg in msgs_claimed] + + @command(name="XAUTOCLAIM", fixed=(Key(XStream), bytes, bytes, Int, bytes), repeat=(bytes,), ) + def xautoclaim(self, key, group_name, consumer_name, min_idle_ms, start, *args): + (count, justid), _ = extract_args(args, ('+count', 'justid')) + count = count or 100 + stream = key.value + if stream is None: + raise SimpleError(msgs.XGROUP_KEY_NOT_FOUND_MSG) + group: StreamGroup = stream.group_get(group_name) + if not group: + raise SimpleError(msgs.XGROUP_GROUP_NOT_FOUND_MSG.format(key, group_name)) + + keys = group.read_pel_msgs(min_idle_ms, start, count) + msgs_claimed, msgs_removed = group.claim(min_idle_ms, keys, consumer_name, None, False) + + res = [ + max(msgs_claimed).encode() if len(msgs_claimed) > 0 else start, + [msg.encode() for msg in msgs_claimed] if justid + else [stream.format_record(msg) for msg in msgs_claimed], + ] + if self.version >= (7,): + res.append([msg.encode() for msg in msgs_removed]) + return res diff --git a/test/test_mixins/test_streams_commands.py b/test/test_mixins/test_streams_commands.py index 2c883f86..cc542344 100644 --- a/test/test_mixins/test_streams_commands.py +++ b/test/test_mixins/test_streams_commands.py @@ -625,8 +625,8 @@ def test_xpending_range_negative(r: redis.Redis): r.xpending_range(stream, group, min=None, max=None, count=None, consumername=0) -@pytest.mark.xfail -def test_xautoclaim(r: redis.Redis): +@pytest.mark.max_server('6.3') +def test_xautoclaim_redis6(r: redis.Redis): stream, group, consumer1, consumer2 = "stream", "group", "consumer1", "consumer2" message_id1 = r.xadd(stream, {"john": "wick"}) @@ -652,7 +652,35 @@ def test_xautoclaim(r: redis.Redis): assert r.xautoclaim(stream, group, consumer1, min_idle_time=0, start_id=message_id2, justid=True) == [message_id2] -def test_xclaim_trimmed(r: redis.Redis): +@pytest.mark.min_server('7') +def test_xautoclaim_redis7(r: redis.Redis): + stream, group, consumer1, consumer2 = "stream", "group", "consumer1", "consumer2" + + message_id1 = r.xadd(stream, {"john": "wick"}) + message_id2 = r.xadd(stream, {"johny": "deff"}) + message = get_stream_message(r, stream, message_id1) + r.xgroup_create(stream, group, 0) + + # trying to claim a message that isn't already pending doesn't + # do anything + assert r.xautoclaim(stream, group, consumer2, min_idle_time=0) == [b"0-0", [], []] + + # read the group as consumer1 to initially claim the messages + r.xreadgroup(group, consumer1, streams={stream: ">"}) + + # claim one message as consumer2 + response = r.xautoclaim(stream, group, consumer2, min_idle_time=0, count=1) + assert response[1] == [message] + + # reclaim the messages as consumer1, but use the justid argument + # which only returns message ids + assert r.xautoclaim(stream, group, consumer1, min_idle_time=0, start_id=0, justid=True) == [ + message_id1, message_id2] + assert r.xautoclaim(stream, group, consumer1, min_idle_time=0, start_id=message_id2, justid=True) == [message_id2] + + +@pytest.mark.min_server('7') +def test_xclaim_trimmed_redis7(r: redis.Redis): # xclaim should not raise an exception if the item is not there stream, group = "stream", "group" @@ -699,10 +727,6 @@ def test_xclaim(r: redis.Redis): # reclaim the message as consumer1, but use the justid argument # which only returns message ids assert r.xclaim( - stream, - group, - consumer1, - min_idle_time=0, - message_ids=(message_id,), - justid=True, + stream, group, consumer1, + min_idle_time=0, message_ids=(message_id,), justid=True, ) == [message_id, ]