Skip to content

Commit

Permalink
fix[stream]:implement XCLAIM
Browse files Browse the repository at this point in the history
Fix #159
  • Loading branch information
cunla committed Jun 19, 2023
1 parent ba21189 commit fa99fd5
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 28 deletions.
2 changes: 1 addition & 1 deletion docs/about/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description: Change log of all fakeredis releases
- Implemented support for various stream groups commands:
- `XGROUP CREATE` #161, `XGROUP DESTROY` #164, `XGROUP SETID` #165, `XGROUP DELCONSUMER` #162,
`XGROUP CREATECONSUMER` #163, `XINFO GROUPS` #168, `XINFO CONSUMERS` #168, `XINFO STREAM` #169, `XREADGROUP` #171,
`XACK` #157, `XPENDING` #170
`XACK` #157, `XPENDING` #170, `XCLAIM` #159
- Implemented sorted set commands:
- `ZRANDMEMBER` #192, `ZDIFF` #187, `ZINTER` #189, `ZUNION` #194, `ZDIFFSTORE` #188,
`ZINTERCARD` #190, `ZRANGESTORE` #193
Expand Down
8 changes: 4 additions & 4 deletions docs/redis-commands/Redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,10 @@ Returns the number of messages that were successfully acknowledged by the consum

Appends a new message to a stream. Creates the key if it doesn't exist.

### [XCLAIM](https://redis.io/commands/xclaim/)

Changes, or acquires, ownership of a message in a consumer group, as if the message was delivered a consumer group member.

### [XDEL](https://redis.io/commands/xdel/)

Returns the number of messages after removing them from a stream.
Expand Down Expand Up @@ -1514,8 +1518,4 @@ Deletes messages from the beginning of a stream.

Changes, or acquires, ownership of messages in a consumer group, as if the messages were delivered to as consumer group member.

#### [XCLAIM](https://redis.io/commands/xclaim/) <small>(not implemented)</small>

Changes, or acquires, ownership of a message in a consumer group, as if the message was delivered a consumer group member.


4 changes: 4 additions & 0 deletions fakeredis/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class NoResponse:
BGSAVE_STARTED = SimpleString(b'Background saving started')


def current_time():
return int(time.time() * 1000)


def null_terminate(s):
# Redis uses C functions on some strings, which means they stop at the
# first NULL.
Expand Down
1 change: 1 addition & 0 deletions fakeredis/_msgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
BIT_ARG_MUST_BE_ZERO_OR_ONE = "ERR The bit argument must be 1 or 0."
XADD_ID_LOWER_THAN_LAST = "ERR The ID specified in XADD is equal or smaller than the target stream top item"
XADD_INVALID_ID = 'ERR Invalid stream ID specified as stream command argument'
XGROUP_BUSYGROUP = 'ERR BUSYGROUP Consumer Group name already exists'
XGROUP_GROUP_NOT_FOUND_MSG = "NOGROUP No such consumer group '%s' for key name '%s'"
XGROUP_KEY_NOT_FOUND_MSG = (
"ERR The XGROUP subcommand requires the key to exist."
Expand Down
57 changes: 44 additions & 13 deletions fakeredis/_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import List, Union, Tuple, Optional, NamedTuple, Dict, Any

from fakeredis._commands import BeforeAny, AfterAny
from fakeredis._helpers import current_time


class StreamEntryKey(NamedTuple):
Expand Down Expand Up @@ -51,10 +52,6 @@ def decode(cls, value: bytes, exclusive=False):
return cls(StreamEntryKey.parse_str(value), exclusive)


def _current_time():
return int(time.time() * 1000)


@dataclass
class StreamConsumerInfo(object):
name: bytes
Expand All @@ -65,7 +62,7 @@ class StreamConsumerInfo(object):
def __init__(self, name):
self.name = name
self.pending = 0
_time = _current_time()
_time = current_time()
self.last_attempt = _time
self.last_success = _time

Expand Down Expand Up @@ -114,7 +111,7 @@ def del_consumer(self, consumer_name: bytes) -> int:
return res

def consumers_info(self):
return [self.consumers[k].info(_current_time()) for k in self.consumers]
return [self.consumers[k].info(current_time()) for k in self.consumers]

def group_info(self) -> List[bytes]:
start_index, _ = self.stream.find_index(self.start_key)
Expand All @@ -135,7 +132,7 @@ def group_info(self) -> List[bytes]:
return list(itertools.chain(*res.items()))

def group_read(self, consumer_name: bytes, start_id: bytes, count: int, noack: bool) -> List:
_time = _current_time()
_time = current_time()
if consumer_name not in self.consumers:
self.consumers[consumer_name] = StreamConsumerInfo(consumer_name)

Expand All @@ -155,6 +152,14 @@ def group_read(self, consumer_name: bytes, start_id: bytes, count: int, noack: b
self.consumers[consumer_name].pending += len(ids_read)
return [self.stream.format_record(x) for x in ids_read]

def _calc_consumer_last_time(self):
new_last_success_map = {k: min(v)[1] for k, v in itertools.groupby(self.pel.values(), key=itemgetter(0))}
for consumer in new_last_success_map:
if consumer not in self.consumers:
self.consumers[consumer] = StreamConsumerInfo(consumer)
self.consumers[consumer].last_attempt = new_last_success_map[consumer]
self.consumers[consumer].last_success = new_last_success_map[consumer]

def ack(self, args: Tuple[bytes]) -> int:
res = 0
for k in args:
Expand All @@ -167,18 +172,15 @@ def ack(self, args: Tuple[bytes]) -> int:
self.consumers[consumer_name].pending -= 1
del self.pel[parsed]
res += 1
new_last_success_map = {k: min(v)[1] for k, v in itertools.groupby(self.pel.values(), key=itemgetter(0))}
for k in new_last_success_map:
self.consumers[k].last_attempt = new_last_success_map[k]
self.consumers[k].last_success = new_last_success_map[k]
self._calc_consumer_last_time()
return res

def pending(self, idle: Optional[int],
start: Optional[StreamRangeTest],
end: Optional[StreamRangeTest],
count: Optional[int],
consumer: Optional[str]):
_time = _current_time()
_time = current_time()
relevent_ids = list(self.pel.keys())
if consumer is not None:
relevent_ids = [k for k in relevent_ids
Expand Down Expand Up @@ -209,6 +211,32 @@ def pending_summary(self) -> List[bytes]:
]
return data

def claim(self, min_idle_ms: int, msgs: List[bytes], consumer_name: bytes, _time: Optional[int],
force: bool) -> List:
curr_time = current_time()
if _time is None:
_time = curr_time
self.consumers.get(consumer_name, StreamConsumerInfo(consumer_name)).last_attempt = curr_time
claimed_msgs = []
for msg in msgs:
try:
key = StreamEntryKey.parse_str(msg)
except Exception:
continue
if key not in self.pel:
if force:
self.pel[key] = (consumer_name, _time) # Force claim msg
if key in self.stream:
claimed_msgs.append(key)
continue
if curr_time - self.pel[key][1] < min_idle_ms:
continue # Not idle enough time to be claimed
self.pel[key] = (consumer_name, _time)
if key in self.stream:
claimed_msgs.append(key)
self._calc_consumer_last_time()
return claimed_msgs


class XStream:
"""Class representing stream.
Expand All @@ -231,7 +259,7 @@ def __init__(self):
self._max_deleted_id = StreamEntryKey(0, 0)
self._entries_added = 0

def group_get(self, group_name: bytes) -> StreamGroup:
def group_get(self, group_name: bytes) -> Optional[StreamGroup]:
return self._groups.get(group_name, None)

def group_add(self, name: bytes, start_key_str: bytes, entries_read: Union[int, None]) -> None:
Expand Down Expand Up @@ -357,6 +385,9 @@ def __getitem__(self, key):
return self._values_dict[StreamEntryKey.parse_str(key)]
return None

def __contains__(self, key: StreamEntryKey):
return key in self._values_dict

def find_index(self, entry_key: StreamEntryKey, from_left=True) -> Tuple[int, bool]:
"""Find the closest index to entry_key_str in the stream
:param entry_key: key for the entry.
Expand Down
24 changes: 15 additions & 9 deletions fakeredis/commands_mixins/streams_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import fakeredis._msgs as msgs
from fakeredis._command_args_parsing import extract_args
from fakeredis._commands import Key, command, CommandItem, Int
from fakeredis._helpers import SimpleError, casematch, OK
from fakeredis._helpers import SimpleError, casematch, OK, current_time
from fakeredis._stream import XStream, StreamRangeTest, StreamGroup


Expand All @@ -19,7 +19,7 @@ def xadd(self, key, *args):
elements = left_args[1:]
if not elements or len(elements) % 2 != 0:
raise SimpleError(msgs.WRONG_ARGS_MSG6.format('XADD'))
stream = key.value or XStream()
stream = key.value if key.value is not None else XStream()
if self.version < (7,) and entry_key != b'*' and not StreamRangeTest.valid_key(entry_key):
raise SimpleError(msgs.XADD_INVALID_ID)
entry_key = stream.add(elements, entry_key=entry_key)
Expand Down Expand Up @@ -141,7 +141,7 @@ def xreadgroup(self, group_const, group_name, consumer_name, *args):
group: StreamGroup = item.value.group_get(group_name)
if not group:
raise SimpleError(msgs.XGROUP_GROUP_NOT_FOUND_MSG.format(left_args[i], group_name))
group_params.append((group, item.key, left_args[i + num_streams],))
group_params.append((group, left_args[i], left_args[i + num_streams],))
if timeout is None:
return self._xreadgroup(consumer_name, group_params, count, noack, False)
else:
Expand Down Expand Up @@ -190,12 +190,15 @@ def xpending(self, key, group_name, *args):
else:
return group.pending_summary()

@command(name="XGROUP CREATE", fixed=(Key(XStream), bytes, bytes), repeat=(bytes,), )
@command(name="XGROUP CREATE", fixed=(Key(XStream), bytes, bytes), repeat=(bytes,), flags=msgs.FLAG_LEAVE_EMPTY_VAL)
def xgroup_create(self, key, group_name, start_key, *args):
(mkstream, entries_read,), _ = extract_args(args, ('mkstream', '+entriesread'))
if key.value is None and not mkstream:
raise SimpleError(msgs.XGROUP_KEY_NOT_FOUND_MSG)
if key.value.group_get(group_name) is not None:
raise SimpleError(msgs.XGROUP_BUSYGROUP)
key.value.group_add(group_name, start_key, entries_read)
key.updated()
return OK

@command(name="XGROUP SETID", fixed=(Key(XStream), bytes, bytes), repeat=(bytes,), )
Expand Down Expand Up @@ -258,9 +261,10 @@ def xinfo_consumers(self, key, group_name, ):

@command(name="XCLAIM", fixed=(Key(XStream), bytes, bytes, Int, bytes), repeat=(bytes,), )
def xclaim(self, key, group_name, consumer_name, min_idle_ms, *args):
if key.value is None:
stream = key.value
if stream is None:
raise SimpleError(msgs.XGROUP_KEY_NOT_FOUND_MSG)
group: StreamGroup = key.value.group_get(group_name)
group: StreamGroup = stream.group_get(group_name)
if not group:
raise SimpleError(msgs.XGROUP_GROUP_NOT_FOUND_MSG.format(key, group_name))

Expand All @@ -269,7 +273,9 @@ def xclaim(self, key, group_name, consumer_name, min_idle_ms, *args):
error_on_unexpected=False,
left_from_first_unexpected=False)

msgs_claimed = group.claim(min_idle_ms, msg_ids, consumer_name, idle, _time, force)
if idle is not None and idle > 0 and _time is None:
_time = current_time() - idle
msgs_claimed = sorted(group.claim(min_idle_ms, msg_ids, consumer_name, _time, force))
if justid:
return msgs_claimed
key.value.get
return [msg.encode() for msg in msgs_claimed]
return [stream.format_record(msg) for msg in msgs_claimed]
34 changes: 33 additions & 1 deletion test/test_mixins/test_streams_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,6 @@ def test_xautoclaim(r: redis.Redis):
assert r.xautoclaim(stream, group, consumer1, min_idle_time=0, start_id=message_id2, justid=True) == [message_id2]


@pytest.mark.xfail
def test_xclaim_trimmed(r: redis.Redis):
# xclaim should not raise an exception if the item is not there
stream, group = "stream", "group"
Expand All @@ -674,3 +673,36 @@ def test_xclaim_trimmed(r: redis.Redis):
item = r.xclaim(stream, group, "consumer2", 0, [sid1, sid2])
assert len(item) == 1
assert item[0][0] == sid2


def test_xclaim(r: redis.Redis):
stream, group, consumer1, consumer2 = "stream", "group", "consumer1", "consumer2"

message_id = r.xadd(stream, {"john": "wick"})
message = get_stream_message(r, stream, message_id)
r.xgroup_create(stream, group, 0)

# trying to claim a message that isn't already pending doesn't
# do anything
assert r.xclaim(
stream, group, consumer2, min_idle_time=0, message_ids=(message_id,)
) == []

# read the group as consumer1 to initially claim the messages
r.xreadgroup(group, consumer1, streams={stream: ">"})

# claim the message as consumer2
assert r.xclaim(
stream, group, consumer2, min_idle_time=0, message_ids=(message_id,)
) == [message, ]

# reclaim the message as consumer1, but use the justid argument
# which only returns message ids
assert r.xclaim(
stream,
group,
consumer1,
min_idle_time=0,
message_ids=(message_id,),
justid=True,
) == [message_id, ]

0 comments on commit fa99fd5

Please sign in to comment.