diff --git a/lightbus/transports/redis.py b/lightbus/transports/redis.py index 9ee372cb..975e751d 100644 --- a/lightbus/transports/redis.py +++ b/lightbus/transports/redis.py @@ -409,6 +409,7 @@ async def fetch_loop(): await queue.put(message) async def reclaim_loop(): + # TODO: Test reclaiming await asyncio.sleep(self.acknowledgement_timeout) async for message in self._reclaim_lost_messages(stream_names, consumer_group): await queue.put(message) @@ -430,6 +431,7 @@ async def _fetch_new_messages(self, streams, consumer_group, forever): # Get any messages that this consumer has yet to process. # This can happen in the case where the processes died before acknowledging. + # TODO: Test that pending messages get picked up pending_messages = await redis.xread_group( group_name=consumer_group, consumer_name=self.consumer_name,