Skip to content

Commit

Permalink
feat[stream]:implement XAUTOCLAIM
Browse files Browse the repository at this point in the history
fix #158
  • Loading branch information
cunla committed Jun 19, 2023
1 parent 5f5e6c6 commit 79285ae
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 23 deletions.
2 changes: 1 addition & 1 deletion docs/about/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description: Change log of all fakeredis releases
- Implemented support for various stream groups commands:
- `XGROUP CREATE` #161, `XGROUP DESTROY` #164, `XGROUP SETID` #165, `XGROUP DELCONSUMER` #162,
`XGROUP CREATECONSUMER` #163, `XINFO GROUPS` #168, `XINFO CONSUMERS` #168, `XINFO STREAM` #169, `XREADGROUP` #171,
`XACK` #157, `XPENDING` #170, `XCLAIM` #159
`XACK` #157, `XPENDING` #170, `XCLAIM` #159, `XAUTOCLAIM` #158
- Implemented sorted set commands:
- `ZRANDMEMBER` #192, `ZDIFF` #187, `ZINTER` #189, `ZUNION` #194, `ZDIFFSTORE` #188,
`ZINTERCARD` #190, `ZRANGESTORE` #193
Expand Down
11 changes: 4 additions & 7 deletions docs/redis-commands/Redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,10 @@ Returns the number of messages that were successfully acknowledged by the consum

Appends a new message to a stream. Creates the key if it doesn't exist.

### [XAUTOCLAIM](https://redis.io/commands/xautoclaim/)

Changes, or acquires, ownership of messages in a consumer group, as if the messages were delivered to as consumer group member.

### [XCLAIM](https://redis.io/commands/xclaim/)

Changes, or acquires, ownership of a message in a consumer group, as if the message was delivered a consumer group member.
Expand Down Expand Up @@ -1511,11 +1515,4 @@ Returns the messages from a stream within a range of IDs in reverse order.
Deletes messages from the beginning of a stream.


### Unsupported stream commands
> To implement support for a command, see [here](../../guides/implement-command/)
#### [XAUTOCLAIM](https://redis.io/commands/xautoclaim/) <small>(not implemented)</small>

Changes, or acquires, ownership of messages in a consumer group, as if the messages were delivered to as consumer group member.


29 changes: 24 additions & 5 deletions fakeredis/_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,31 +211,50 @@ def pending_summary(self) -> List[bytes]:
]
return data

def claim(self, min_idle_ms: int, msgs: List[bytes], consumer_name: bytes, _time: Optional[int],
force: bool) -> List:
def claim(self,
min_idle_ms: int,
msgs: List[bytes],
consumer_name: bytes,
_time: Optional[int],
force: bool) -> Tuple[List, List]:
curr_time = current_time()
if _time is None:
_time = curr_time
self.consumers.get(consumer_name, StreamConsumerInfo(consumer_name)).last_attempt = curr_time
claimed_msgs = []
claimed_msgs, deleted_msgs = [], []
for msg in msgs:
try:
key = StreamEntryKey.parse_str(msg)
key = StreamEntryKey.parse_str(msg) if isinstance(msg, bytes) else msg
except Exception:
continue
if key not in self.pel:
if force:
self.pel[key] = (consumer_name, _time) # Force claim msg
if key in self.stream:
claimed_msgs.append(key)
else:
deleted_msgs.append(key)
del self.pel[key]
continue
if curr_time - self.pel[key][1] < min_idle_ms:
continue # Not idle enough time to be claimed
self.pel[key] = (consumer_name, _time)
if key in self.stream:
claimed_msgs.append(key)
else:
deleted_msgs.append(key)
del self.pel[key]
self._calc_consumer_last_time()
return claimed_msgs
return sorted(claimed_msgs), sorted(deleted_msgs)

def read_pel_msgs(self, min_idle_ms: int, start: bytes, count: int):
start_key = StreamEntryKey.parse_str(start)
curr_time = current_time()
msgs = sorted([k for k in self.pel
if (curr_time - self.pel[k][1] >= min_idle_ms)
and k >= start_key])
count = min(count, len(msgs))
return msgs[:count]


class XStream:
Expand Down
26 changes: 25 additions & 1 deletion fakeredis/commands_mixins/streams_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,31 @@ def xclaim(self, key, group_name, consumer_name, min_idle_ms, *args):

if idle is not None and idle > 0 and _time is None:
_time = current_time() - idle
msgs_claimed = sorted(group.claim(min_idle_ms, msg_ids, consumer_name, _time, force))
msgs_claimed, _ = group.claim(min_idle_ms, msg_ids, consumer_name, _time, force)

if justid:
return [msg.encode() for msg in msgs_claimed]
return [stream.format_record(msg) for msg in msgs_claimed]

@command(name="XAUTOCLAIM", fixed=(Key(XStream), bytes, bytes, Int, bytes), repeat=(bytes,), )
def xautoclaim(self, key, group_name, consumer_name, min_idle_ms, start, *args):
(count, justid), _ = extract_args(args, ('+count', 'justid'))
count = count or 100
stream = key.value
if stream is None:
raise SimpleError(msgs.XGROUP_KEY_NOT_FOUND_MSG)
group: StreamGroup = stream.group_get(group_name)
if not group:
raise SimpleError(msgs.XGROUP_GROUP_NOT_FOUND_MSG.format(key, group_name))

keys = group.read_pel_msgs(min_idle_ms, start, count)
msgs_claimed, msgs_removed = group.claim(min_idle_ms, keys, consumer_name, None, False)

res = [
max(msgs_claimed).encode() if len(msgs_claimed) > 0 else start,
[msg.encode() for msg in msgs_claimed] if justid
else [stream.format_record(msg) for msg in msgs_claimed],
]
if self.version >= (7,):
res.append([msg.encode() for msg in msgs_removed])
return res
42 changes: 33 additions & 9 deletions test/test_mixins/test_streams_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,8 +625,8 @@ def test_xpending_range_negative(r: redis.Redis):
r.xpending_range(stream, group, min=None, max=None, count=None, consumername=0)


@pytest.mark.xfail
def test_xautoclaim(r: redis.Redis):
@pytest.mark.max_server('6.3')
def test_xautoclaim_redis6(r: redis.Redis):
stream, group, consumer1, consumer2 = "stream", "group", "consumer1", "consumer2"

message_id1 = r.xadd(stream, {"john": "wick"})
Expand All @@ -652,7 +652,35 @@ def test_xautoclaim(r: redis.Redis):
assert r.xautoclaim(stream, group, consumer1, min_idle_time=0, start_id=message_id2, justid=True) == [message_id2]


def test_xclaim_trimmed(r: redis.Redis):
@pytest.mark.min_server('7')
def test_xautoclaim_redis7(r: redis.Redis):
stream, group, consumer1, consumer2 = "stream", "group", "consumer1", "consumer2"

message_id1 = r.xadd(stream, {"john": "wick"})
message_id2 = r.xadd(stream, {"johny": "deff"})
message = get_stream_message(r, stream, message_id1)
r.xgroup_create(stream, group, 0)

# trying to claim a message that isn't already pending doesn't
# do anything
assert r.xautoclaim(stream, group, consumer2, min_idle_time=0) == [b"0-0", [], []]

# read the group as consumer1 to initially claim the messages
r.xreadgroup(group, consumer1, streams={stream: ">"})

# claim one message as consumer2
response = r.xautoclaim(stream, group, consumer2, min_idle_time=0, count=1)
assert response[1] == [message]

# reclaim the messages as consumer1, but use the justid argument
# which only returns message ids
assert r.xautoclaim(stream, group, consumer1, min_idle_time=0, start_id=0, justid=True) == [
message_id1, message_id2]
assert r.xautoclaim(stream, group, consumer1, min_idle_time=0, start_id=message_id2, justid=True) == [message_id2]


@pytest.mark.min_server('7')
def test_xclaim_trimmed_redis7(r: redis.Redis):
# xclaim should not raise an exception if the item is not there
stream, group = "stream", "group"

Expand Down Expand Up @@ -699,10 +727,6 @@ def test_xclaim(r: redis.Redis):
# reclaim the message as consumer1, but use the justid argument
# which only returns message ids
assert r.xclaim(
stream,
group,
consumer1,
min_idle_time=0,
message_ids=(message_id,),
justid=True,
stream, group, consumer1,
min_idle_time=0, message_ids=(message_id,), justid=True,
) == [message_id, ]

0 comments on commit 79285ae

Please sign in to comment.