-
Notifications
You must be signed in to change notification settings - Fork 22
/
test_reliability_transport.py
53 lines (39 loc) · 1.49 KB
/
test_reliability_transport.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import asyncio
import logging
import pytest
from lightbus.transports.transactional import lightbus_set_database
pytestmark = pytest.mark.reliability
@pytest.mark.asyncio
async def test_multiple_connections(
transactional_bus_factory,
pg_kwargs,
test_table,
loop,
dummy_api,
messages_in_redis,
get_outbox,
caplog,
cursor_factory,
dbapi_database,
):
import aiopg
# We're going to send a lot of messages. Let's keep the logging sane
caplog.set_level(logging.WARNING)
# Migrate now
await dbapi_database.migrate()
async def start_firing(number):
async with aiopg.connect(loop=loop, **pg_kwargs) as connection:
async with connection.cursor(cursor_factory=cursor_factory) as cursor:
bus = await transactional_bus_factory()
await bus.client.register_api_async(dummy_api)
for x in range(0, 50):
async with lightbus_set_database(bus, connection, apis=["my.dummy"]):
await bus.my.dummy.my_event.fire_async(field="a")
await cursor.execute(
"INSERT INTO test_table VALUES (%s)", [f"{number}-{x}"]
)
await bus.client.close_async()
await asyncio.gather(*[start_firing(n) for n in range(0, 5)])
assert await test_table.total_rows() == 250
assert len(await get_outbox()) == 0
assert len(await messages_in_redis("my.dummy", "my_event")) == 250