Skip to content

Commit

Permalink
Acking messages now handled in a more sane way
Browse files Browse the repository at this point in the history
  • Loading branch information
adamcharnock committed Apr 17, 2018
1 parent d64013b commit b2fd495
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 18 deletions.
8 changes: 4 additions & 4 deletions lightbus/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,10 @@ async def listen_for_event_task(event_transport, events):
if inspect.isawaitable(co):
await co

# We manually acknowledge here rather than rely on the consumer to continue
# post-yielding. This allows us to trigger the 'after_event_execution' plugin hook
# in the knowledge that the message has actually been acknowledged.
await event_message.acknowledge()
# Await the consumer again, which is our way of allowing it to
# acknowledge the message. This then allows us to fire the
# `after_event_execution` plugin hook immediately afterwards
await consumer.__anext__()
await plugin_hook('after_event_execution', event_message=event_message, bus_client=self)

# Get the events transports for the selection of APIs that we are listening on
Expand Down
20 changes: 7 additions & 13 deletions lightbus/transports/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ async def _fetch_new_messages(self, streams, consumer_group, forever):
timeout=None, # Don't block, return immediately
)
for stream, message_id, fields in pending_messages:
event_message = self._fields_to_message(redis, stream, message_id, fields, consumer_group)
event_message = self._fields_to_message(fields)
logger.debug(LBullets(
L("⬅ Receiving pending event {} on stream {}", Bold(message_id), Bold(stream)),
items=dict(**event_message.get_metadata(), kwargs=event_message.get_kwargs())
Expand Down Expand Up @@ -469,13 +469,14 @@ async def _fetch_new_messages(self, streams, consumer_group, forever):

# Handle the messages we have received
for stream, message_id, fields in stream_messages:
event_message = self._fields_to_message(redis, fields, message_id, fields, consumer_group)
event_message = self._fields_to_message(fields)
logger.debug(LBullets(
L("⬅ Received new event {} on stream {}", Bold(message_id), Bold(stream)),
items=dict(**event_message.get_metadata(), kwargs=event_message.get_kwargs())
))
yield event_message
# Acknowledging is handled on the message itself.
await redis.xack(stream, consumer_group, message_id)
yield True

if not forever:
return
Expand All @@ -497,9 +498,7 @@ async def _reclaim_lost_messages(self, stream_names, consumer_group):

result = await redis.xclaim(stream, consumer_group, self.consumer_name, timeout, message_id)
for claimed_message_id, fields in result:
event_message = self._fields_to_message(
redis, fields, claimed_message_id, fields, consumer_group
)
event_message = self._fields_to_message(fields)
logger.debug(LBullets(
L("⬅ Reclaimed timed out event {} on stream {}. Abandoned by {}.",
Bold(message_id), Bold(stream), Bold(consumer_name)),
Expand All @@ -521,13 +520,8 @@ async def _create_consumer_groups(self, streams, redis, consumer_group):
if 'BUSYGROUP' not in str(e):
raise

def _fields_to_message(self, redis, stream, message_id, fields, consumer_group) -> EventMessage:
event_message = self.deserializer(fields)
event_message.on_ack = partial(self._do_ack, redis, stream, consumer_group, decode(message_id, 'utf8'))
return event_message

def _do_ack(self, redis, stream, consumer_group, message_id):
return redis.xack(stream, consumer_group, message_id)
def _fields_to_message(self, fields) -> EventMessage:
return self.deserializer(fields)


class RedisSchemaTransport(RedisTransportMixin, SchemaTransport):
Expand Down
3 changes: 2 additions & 1 deletion tests/redis_transports/test_integration_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async def co_consume_rpcs():
@pytest.mark.run_loop
async def test_event(bus: lightbus.BusNode, dummy_api):
"""Full event integration test"""

manually_set_plugins({})
received_kwargs = []
received_api_name = None
received_event_name = None
Expand All @@ -115,6 +115,7 @@ async def listener(api_name, event_name, **kwargs):
await bus.my.dummy.my_event.listen_async(listener)
await asyncio.sleep(0.01)
await bus.my.dummy.my_event.fire_async(field='Hello! 😎')
await asyncio.sleep(0.01)

# await asyncio.gather(co_fire_event(), co_listen_for_events())
assert received_kwargs == [{'field': 'Hello! 😎'}]
Expand Down

0 comments on commit b2fd495

Please sign in to comment.