Skip to content

Commit

Permalink
Testing consumer group creation and noop message handling
Browse files Browse the repository at this point in the history
  • Loading branch information
adamcharnock committed Apr 18, 2018
1 parent 03acbe0 commit 3ca7b40
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 7 deletions.
3 changes: 3 additions & 0 deletions lightbus/serializers/by_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def __call__(self, serialized: dict):
k = decode_bytes(k)
v = decode_bytes(v)

if not k:
continue

# kwarg fields start with a ':', everything else is metadata
if k[0] == ':':
# kwarg values need decoding
Expand Down
13 changes: 10 additions & 3 deletions lightbus/transports/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ async def _fetch_new_messages(self, streams, consumer_group, forever):
)
for stream, message_id, fields in pending_messages:
event_message = self._fields_to_message(fields)
if not event_message:
continue # noop message
logger.debug(LBullets(
L("⬅ Receiving pending event {} on stream {}", Bold(message_id), Bold(stream)),
items=dict(**event_message.get_metadata(), kwargs=event_message.get_kwargs())
Expand Down Expand Up @@ -464,6 +466,8 @@ async def _fetch_new_messages(self, streams, consumer_group, forever):
# Handle the messages we have received
for stream, message_id, fields in stream_messages:
event_message = self._fields_to_message(fields)
if not event_message:
continue # noop message
logger.debug(LBullets(
L("⬅ Received new event {} on stream {}", Bold(message_id), Bold(stream)),
items=dict(**event_message.get_metadata(), kwargs=event_message.get_kwargs())
Expand All @@ -476,7 +480,7 @@ async def _fetch_new_messages(self, streams, consumer_group, forever):
return

async def _reclaim_lost_messages(self, stream_names, consumer_group):
"""Reclaim messages that others consumers in the group failed to acknowledge"""
"""Reclaim messages that other consumers in the group failed to acknowledge"""
with await self.connection_manager() as redis:
for stream in stream_names:
old_messages = await redis.xpending(stream, consumer_group, '-', '+', count=100)
Expand All @@ -492,6 +496,8 @@ async def _reclaim_lost_messages(self, stream_names, consumer_group):
result = await redis.xclaim(stream, consumer_group, self.consumer_name, int(timeout), message_id)
for claimed_message_id, fields in result:
event_message = self._fields_to_message(fields)
if not event_message:
continue # noop message
logger.debug(LBullets(
L("⬅ Reclaimed timed out event {} on stream {}. Abandoned by {}.",
Bold(message_id), Bold(stream), Bold(consumer_name)),
Expand All @@ -503,7 +509,6 @@ async def _create_consumer_groups(self, streams, redis, consumer_group):
for stream, since in streams.items():
if not await redis.exists(stream):
# Add a noop to ensure the stream exists
# TODO: Test to ensure noops are ignored in fetch()
await redis.xadd(stream, fields={'': ''})

try:
Expand All @@ -513,7 +518,9 @@ async def _create_consumer_groups(self, streams, redis, consumer_group):
if 'BUSYGROUP' not in str(e):
raise

def _fields_to_message(self, fields) -> EventMessage:
def _fields_to_message(self, fields) -> Optional[EventMessage]:
if tuple(fields.items()) == ((b'', b''),):
return None
return self.deserializer(fields)


Expand Down
37 changes: 33 additions & 4 deletions tests/redis_transports/test_unit_redis_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ async def co_consume():
async for message_ in redis_event_transport.consume([('my.dummy', 'my_event')], {}, loop):
messages.append(message_)

asyncio.ensure_future(co_consume())
asyncio.ensure_future(co_consume())
task1 = asyncio.ensure_future(co_consume())
task2 = asyncio.ensure_future(co_consume())

await asyncio.sleep(0.1)
await redis_client.xadd('my.dummy.my_event:stream', fields={
Expand All @@ -80,6 +80,8 @@ async def co_consume():
# Two messages, to dummy values which indicate events have been acked
assert len(messages) == 4

await cancel(task1, task2)


@pytest.mark.run_loop
async def test_consume_events_multiple_consumers_one_group(loop, redis_pool, redis_client, dummy_api):
Expand Down Expand Up @@ -350,9 +352,10 @@ async def consume():
async for message in consumer:
messages.append(message)

asyncio.ensure_future(consume(), loop=loop)
task = asyncio.ensure_future(consume(), loop=loop)
await asyncio.sleep(0.1)
assert len(messages) == 1
await cancel(task)


@pytest.mark.run_loop
Expand Down Expand Up @@ -392,9 +395,35 @@ async def consume():
async for message in consumer:
messages.append(message)

asyncio.ensure_future(consume(), loop=loop)
task = asyncio.ensure_future(consume(), loop=loop)
await asyncio.sleep(0.1)
assert len(messages) == 1
assert messages[0].api_name == 'my.dummy'
assert messages[0].event_name == 'my_event'
assert messages[0].kwargs == {'field': 'value'}

await cancel(task)


@pytest.mark.run_loop
async def test_consume_events_create_consumer_group_first(loop, redis_client, redis_event_transport, dummy_api):
"""Create the consumer group before the stream exists
This should create a noop message which gets ignored by the event transport
"""
consumer = redis_event_transport.consume(
listen_for=[('my.dummy', 'my_event')],
since='0',
loop=loop,
context={},
consumer_group='test_group',
)
messages = []
async def consume():
async for message in consumer:
messages.append(message)

task = asyncio.ensure_future(consume(), loop=loop)
await asyncio.sleep(0.1)
assert len(messages) == 0
await cancel(task)
13 changes: 13 additions & 0 deletions tests/serializers/test_by_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,16 @@ def test_by_field_deserializer():
assert message.api_name == 'my.api'
assert message.event_name == 'my_event'
assert message.kwargs == {'field': 'value'}


def test_by_field_deserializer_empty_keys_and_values():
deserializer = ByFieldMessageDeserializer(EventMessage)
message = deserializer({
'api_name': 'my.api',
'event_name': 'my_event',
':field': '"value"',
'': '',
})
assert message.api_name == 'my.api'
assert message.event_name == 'my_event'
assert message.kwargs == {'field': 'value'}

0 comments on commit 3ca7b40

Please sign in to comment.