Skip to content
Browse files

Only publish one message at a time

[closes #14]
  • Loading branch information...
adamhooper committed Jul 9, 2019
1 parent 01aabf8 commit 599b02643c3161cbdf5285636488d71512401c91
Showing with 43 additions and 5 deletions.
  1. +12 −5 channels_rabbitmq/
  2. +31 −0 tests/
@@ -492,6 +492,11 @@ def __init__(
# Lock used to add/remove from groups atomically
self._groups_lock = asyncio.Lock()

# Lock used during send: we only send one message at a time. (An
# alternative approach would be to use a pool of channels for
# sending. It's not clear what that would win us.)
self._publish_lock = asyncio.Lock()

self._is_closed = False

# self._is_connected: means self._protocol and self._channel are
@@ -748,17 +753,18 @@ def _notify_connect_event(self):
connection.send({'foo': 'bar'})
message["__asgi_channel__"] = asgi_channel
message = {**message, "__asgi_channel__": asgi_channel}
message = msgpack.packb(message, use_bin_type=True)

queue_name = channel_to_queue_name(asgi_channel)
logger.debug("publish %r on %s", message, queue_name)

# Publish with publisher_confirms=True. Assume the server is configured
# with `overflow: reject-publish`, so we get a basic.nack if the queue
# length is exceeded.
logger.debug("publish %r on %s", message, queue_name)
await channel.publish(message, "", queue_name)
async with self._publish_lock:
await channel.publish(message, "", queue_name)
except PublishFailed:
raise ChannelFull()
@@ -819,13 +825,14 @@ def _notify_connect_event(self):

async def group_send(self, channel, group, message):
message["__asgi_group__"] = group
message = {**message, "__asgi_group__": group}
message = msgpack.packb(message, use_bin_type=True)

logger.debug("group_send %r to %s", message, group)

await channel.publish(message, self.groups_exchange, routing_key=group)
async with self._publish_lock:
await channel.publish(message, self.groups_exchange, routing_key=group)
except PublishFailed:
# The Channels protocol has no way of reporting this error.
# Just silently delete the message.
@@ -454,3 +454,34 @@ def factory(queue_name, **kwargs):
t2 = time.time()
assert t2 - t1 >= ReconnectDelay, "send() should stall until reconnect"
assert (await connection2.receive("x!y"))["type"] == "test.2"

async def test_concurrent_send(connect):
Ensure all frames for one AMQP message are sent before another is sent.
connection = connect("x")
await connection.group_add("test-group", "x!1")

# Send lots of concurrent messages: both with group_send() and send().
# We're sending concurrently. Order doesn't matter.
# It can take _lots_ of messages to trigger bug #14 locally. Prior to the
# bugfix (a mutex during publish), messages' frames could be interwoven;
# but with Python 3.6 on Linux that happened rarely when sending fewer than
# 100 concurrent messages locally.
texts = set(f"x{i}" for i in range(100))
messages = [{"type": text} for text in texts]
group_sends = [connection.group_send("test-group", m) for m in messages]
direct_sends = [connection.send("x!2", m) for m in messages]
group_receives = [asyncio.ensure_future(connection.receive("x!1")) for _ in texts]
direct_receives = [asyncio.ensure_future(connection.receive("x!2")) for _ in texts]
await asyncio.gather(
*(group_sends + direct_sends + group_receives + direct_receives)

assert set([m.result()["type"] for m in group_receives]) == texts
assert set([m.result()["type"] for m in direct_receives]) == texts

0 comments on commit 599b026

Please sign in to comment.
You can’t perform that action at this time.