From fa99fd57b862d14e02ad24b22fbc9ed3d3ea48c1 Mon Sep 17 00:00:00 2001 From: Daniel M Date: Sun, 18 Jun 2023 22:11:40 -0400 Subject: [PATCH] fix[stream]:implement `XCLAIM` Fix #159 --- docs/about/changelog.md | 2 +- docs/redis-commands/Redis.md | 8 +-- fakeredis/_helpers.py | 4 ++ fakeredis/_msgs.py | 1 + fakeredis/_stream.py | 57 +++++++++++++++++----- fakeredis/commands_mixins/streams_mixin.py | 24 +++++---- test/test_mixins/test_streams_commands.py | 34 ++++++++++++- 7 files changed, 102 insertions(+), 28 deletions(-) diff --git a/docs/about/changelog.md b/docs/about/changelog.md index 9e09654b..a76529e9 100644 --- a/docs/about/changelog.md +++ b/docs/about/changelog.md @@ -10,7 +10,7 @@ description: Change log of all fakeredis releases - Implemented support for various stream groups commands: - `XGROUP CREATE` #161, `XGROUP DESTROY` #164, `XGROUP SETID` #165, `XGROUP DELCONSUMER` #162, `XGROUP CREATECONSUMER` #163, `XINFO GROUPS` #168, `XINFO CONSUMERS` #168, `XINFO STREAM` #169, `XREADGROUP` #171, - `XACK` #157, `XPENDING` #170 + `XACK` #157, `XPENDING` #170, `XCLAIM` #159 - Implemented sorted set commands: - `ZRANDMEMBER` #192, `ZDIFF` #187, `ZINTER` #189, `ZUNION` #194, `ZDIFFSTORE` #188, `ZINTERCARD` #190, `ZRANGESTORE` #193 diff --git a/docs/redis-commands/Redis.md b/docs/redis-commands/Redis.md index dad5c503..7e2709a3 100644 --- a/docs/redis-commands/Redis.md +++ b/docs/redis-commands/Redis.md @@ -1442,6 +1442,10 @@ Returns the number of messages that were successfully acknowledged by the consum Appends a new message to a stream. Creates the key if it doesn't exist. +### [XCLAIM](https://redis.io/commands/xclaim/) + +Changes, or acquires, ownership of a message in a consumer group, as if the message was delivered a consumer group member. + ### [XDEL](https://redis.io/commands/xdel/) Returns the number of messages after removing them from a stream. @@ -1514,8 +1518,4 @@ Deletes messages from the beginning of a stream. Changes, or acquires, ownership of messages in a consumer group, as if the messages were delivered to as consumer group member. -#### [XCLAIM](https://redis.io/commands/xclaim/) (not implemented) - -Changes, or acquires, ownership of a message in a consumer group, as if the message was delivered a consumer group member. - diff --git a/fakeredis/_helpers.py b/fakeredis/_helpers.py index 23e40c22..37a0d9dd 100644 --- a/fakeredis/_helpers.py +++ b/fakeredis/_helpers.py @@ -34,6 +34,10 @@ class NoResponse: BGSAVE_STARTED = SimpleString(b'Background saving started') +def current_time(): + return int(time.time() * 1000) + + def null_terminate(s): # Redis uses C functions on some strings, which means they stop at the # first NULL. diff --git a/fakeredis/_msgs.py b/fakeredis/_msgs.py index 3073a96d..6a9e2f07 100644 --- a/fakeredis/_msgs.py +++ b/fakeredis/_msgs.py @@ -65,6 +65,7 @@ BIT_ARG_MUST_BE_ZERO_OR_ONE = "ERR The bit argument must be 1 or 0." XADD_ID_LOWER_THAN_LAST = "ERR The ID specified in XADD is equal or smaller than the target stream top item" XADD_INVALID_ID = 'ERR Invalid stream ID specified as stream command argument' +XGROUP_BUSYGROUP = 'ERR BUSYGROUP Consumer Group name already exists' XGROUP_GROUP_NOT_FOUND_MSG = "NOGROUP No such consumer group '%s' for key name '%s'" XGROUP_KEY_NOT_FOUND_MSG = ( "ERR The XGROUP subcommand requires the key to exist." diff --git a/fakeredis/_stream.py b/fakeredis/_stream.py index ef855ef3..b2fa5a5d 100644 --- a/fakeredis/_stream.py +++ b/fakeredis/_stream.py @@ -7,6 +7,7 @@ from typing import List, Union, Tuple, Optional, NamedTuple, Dict, Any from fakeredis._commands import BeforeAny, AfterAny +from fakeredis._helpers import current_time class StreamEntryKey(NamedTuple): @@ -51,10 +52,6 @@ def decode(cls, value: bytes, exclusive=False): return cls(StreamEntryKey.parse_str(value), exclusive) -def _current_time(): - return int(time.time() * 1000) - - @dataclass class StreamConsumerInfo(object): name: bytes @@ -65,7 +62,7 @@ class StreamConsumerInfo(object): def __init__(self, name): self.name = name self.pending = 0 - _time = _current_time() + _time = current_time() self.last_attempt = _time self.last_success = _time @@ -114,7 +111,7 @@ def del_consumer(self, consumer_name: bytes) -> int: return res def consumers_info(self): - return [self.consumers[k].info(_current_time()) for k in self.consumers] + return [self.consumers[k].info(current_time()) for k in self.consumers] def group_info(self) -> List[bytes]: start_index, _ = self.stream.find_index(self.start_key) @@ -135,7 +132,7 @@ def group_info(self) -> List[bytes]: return list(itertools.chain(*res.items())) def group_read(self, consumer_name: bytes, start_id: bytes, count: int, noack: bool) -> List: - _time = _current_time() + _time = current_time() if consumer_name not in self.consumers: self.consumers[consumer_name] = StreamConsumerInfo(consumer_name) @@ -155,6 +152,14 @@ def group_read(self, consumer_name: bytes, start_id: bytes, count: int, noack: b self.consumers[consumer_name].pending += len(ids_read) return [self.stream.format_record(x) for x in ids_read] + def _calc_consumer_last_time(self): + new_last_success_map = {k: min(v)[1] for k, v in itertools.groupby(self.pel.values(), key=itemgetter(0))} + for consumer in new_last_success_map: + if consumer not in self.consumers: + self.consumers[consumer] = StreamConsumerInfo(consumer) + self.consumers[consumer].last_attempt = new_last_success_map[consumer] + self.consumers[consumer].last_success = new_last_success_map[consumer] + def ack(self, args: Tuple[bytes]) -> int: res = 0 for k in args: @@ -167,10 +172,7 @@ def ack(self, args: Tuple[bytes]) -> int: self.consumers[consumer_name].pending -= 1 del self.pel[parsed] res += 1 - new_last_success_map = {k: min(v)[1] for k, v in itertools.groupby(self.pel.values(), key=itemgetter(0))} - for k in new_last_success_map: - self.consumers[k].last_attempt = new_last_success_map[k] - self.consumers[k].last_success = new_last_success_map[k] + self._calc_consumer_last_time() return res def pending(self, idle: Optional[int], @@ -178,7 +180,7 @@ def pending(self, idle: Optional[int], end: Optional[StreamRangeTest], count: Optional[int], consumer: Optional[str]): - _time = _current_time() + _time = current_time() relevent_ids = list(self.pel.keys()) if consumer is not None: relevent_ids = [k for k in relevent_ids @@ -209,6 +211,32 @@ def pending_summary(self) -> List[bytes]: ] return data + def claim(self, min_idle_ms: int, msgs: List[bytes], consumer_name: bytes, _time: Optional[int], + force: bool) -> List: + curr_time = current_time() + if _time is None: + _time = curr_time + self.consumers.get(consumer_name, StreamConsumerInfo(consumer_name)).last_attempt = curr_time + claimed_msgs = [] + for msg in msgs: + try: + key = StreamEntryKey.parse_str(msg) + except Exception: + continue + if key not in self.pel: + if force: + self.pel[key] = (consumer_name, _time) # Force claim msg + if key in self.stream: + claimed_msgs.append(key) + continue + if curr_time - self.pel[key][1] < min_idle_ms: + continue # Not idle enough time to be claimed + self.pel[key] = (consumer_name, _time) + if key in self.stream: + claimed_msgs.append(key) + self._calc_consumer_last_time() + return claimed_msgs + class XStream: """Class representing stream. @@ -231,7 +259,7 @@ def __init__(self): self._max_deleted_id = StreamEntryKey(0, 0) self._entries_added = 0 - def group_get(self, group_name: bytes) -> StreamGroup: + def group_get(self, group_name: bytes) -> Optional[StreamGroup]: return self._groups.get(group_name, None) def group_add(self, name: bytes, start_key_str: bytes, entries_read: Union[int, None]) -> None: @@ -357,6 +385,9 @@ def __getitem__(self, key): return self._values_dict[StreamEntryKey.parse_str(key)] return None + def __contains__(self, key: StreamEntryKey): + return key in self._values_dict + def find_index(self, entry_key: StreamEntryKey, from_left=True) -> Tuple[int, bool]: """Find the closest index to entry_key_str in the stream :param entry_key: key for the entry. diff --git a/fakeredis/commands_mixins/streams_mixin.py b/fakeredis/commands_mixins/streams_mixin.py index 5e7fa2ff..ff7e4816 100644 --- a/fakeredis/commands_mixins/streams_mixin.py +++ b/fakeredis/commands_mixins/streams_mixin.py @@ -4,7 +4,7 @@ import fakeredis._msgs as msgs from fakeredis._command_args_parsing import extract_args from fakeredis._commands import Key, command, CommandItem, Int -from fakeredis._helpers import SimpleError, casematch, OK +from fakeredis._helpers import SimpleError, casematch, OK, current_time from fakeredis._stream import XStream, StreamRangeTest, StreamGroup @@ -19,7 +19,7 @@ def xadd(self, key, *args): elements = left_args[1:] if not elements or len(elements) % 2 != 0: raise SimpleError(msgs.WRONG_ARGS_MSG6.format('XADD')) - stream = key.value or XStream() + stream = key.value if key.value is not None else XStream() if self.version < (7,) and entry_key != b'*' and not StreamRangeTest.valid_key(entry_key): raise SimpleError(msgs.XADD_INVALID_ID) entry_key = stream.add(elements, entry_key=entry_key) @@ -141,7 +141,7 @@ def xreadgroup(self, group_const, group_name, consumer_name, *args): group: StreamGroup = item.value.group_get(group_name) if not group: raise SimpleError(msgs.XGROUP_GROUP_NOT_FOUND_MSG.format(left_args[i], group_name)) - group_params.append((group, item.key, left_args[i + num_streams],)) + group_params.append((group, left_args[i], left_args[i + num_streams],)) if timeout is None: return self._xreadgroup(consumer_name, group_params, count, noack, False) else: @@ -190,12 +190,15 @@ def xpending(self, key, group_name, *args): else: return group.pending_summary() - @command(name="XGROUP CREATE", fixed=(Key(XStream), bytes, bytes), repeat=(bytes,), ) + @command(name="XGROUP CREATE", fixed=(Key(XStream), bytes, bytes), repeat=(bytes,), flags=msgs.FLAG_LEAVE_EMPTY_VAL) def xgroup_create(self, key, group_name, start_key, *args): (mkstream, entries_read,), _ = extract_args(args, ('mkstream', '+entriesread')) if key.value is None and not mkstream: raise SimpleError(msgs.XGROUP_KEY_NOT_FOUND_MSG) + if key.value.group_get(group_name) is not None: + raise SimpleError(msgs.XGROUP_BUSYGROUP) key.value.group_add(group_name, start_key, entries_read) + key.updated() return OK @command(name="XGROUP SETID", fixed=(Key(XStream), bytes, bytes), repeat=(bytes,), ) @@ -258,9 +261,10 @@ def xinfo_consumers(self, key, group_name, ): @command(name="XCLAIM", fixed=(Key(XStream), bytes, bytes, Int, bytes), repeat=(bytes,), ) def xclaim(self, key, group_name, consumer_name, min_idle_ms, *args): - if key.value is None: + stream = key.value + if stream is None: raise SimpleError(msgs.XGROUP_KEY_NOT_FOUND_MSG) - group: StreamGroup = key.value.group_get(group_name) + group: StreamGroup = stream.group_get(group_name) if not group: raise SimpleError(msgs.XGROUP_GROUP_NOT_FOUND_MSG.format(key, group_name)) @@ -269,7 +273,9 @@ def xclaim(self, key, group_name, consumer_name, min_idle_ms, *args): error_on_unexpected=False, left_from_first_unexpected=False) - msgs_claimed = group.claim(min_idle_ms, msg_ids, consumer_name, idle, _time, force) + if idle is not None and idle > 0 and _time is None: + _time = current_time() - idle + msgs_claimed = sorted(group.claim(min_idle_ms, msg_ids, consumer_name, _time, force)) if justid: - return msgs_claimed - key.value.get + return [msg.encode() for msg in msgs_claimed] + return [stream.format_record(msg) for msg in msgs_claimed] diff --git a/test/test_mixins/test_streams_commands.py b/test/test_mixins/test_streams_commands.py index 82d7f922..2c883f86 100644 --- a/test/test_mixins/test_streams_commands.py +++ b/test/test_mixins/test_streams_commands.py @@ -652,7 +652,6 @@ def test_xautoclaim(r: redis.Redis): assert r.xautoclaim(stream, group, consumer1, min_idle_time=0, start_id=message_id2, justid=True) == [message_id2] -@pytest.mark.xfail def test_xclaim_trimmed(r: redis.Redis): # xclaim should not raise an exception if the item is not there stream, group = "stream", "group" @@ -674,3 +673,36 @@ def test_xclaim_trimmed(r: redis.Redis): item = r.xclaim(stream, group, "consumer2", 0, [sid1, sid2]) assert len(item) == 1 assert item[0][0] == sid2 + + +def test_xclaim(r: redis.Redis): + stream, group, consumer1, consumer2 = "stream", "group", "consumer1", "consumer2" + + message_id = r.xadd(stream, {"john": "wick"}) + message = get_stream_message(r, stream, message_id) + r.xgroup_create(stream, group, 0) + + # trying to claim a message that isn't already pending doesn't + # do anything + assert r.xclaim( + stream, group, consumer2, min_idle_time=0, message_ids=(message_id,) + ) == [] + + # read the group as consumer1 to initially claim the messages + r.xreadgroup(group, consumer1, streams={stream: ">"}) + + # claim the message as consumer2 + assert r.xclaim( + stream, group, consumer2, min_idle_time=0, message_ids=(message_id,) + ) == [message, ] + + # reclaim the message as consumer1, but use the justid argument + # which only returns message ids + assert r.xclaim( + stream, + group, + consumer1, + min_idle_time=0, + message_ids=(message_id,), + justid=True, + ) == [message_id, ]