Skip to content

Commit

Permalink
Make the RabbitMQ broker lazy
Browse files Browse the repository at this point in the history
  • Loading branch information
timdrijvers committed Jan 31, 2020
1 parent b25f70c commit 31318f7
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 22 deletions.
47 changes: 36 additions & 11 deletions dramatiq/brokers/rabbitmq.py
Expand Up @@ -106,9 +106,16 @@ def __init__(self, *, confirm_delivery=False, url=None, middleware=None, max_pri
self.max_priority = max_priority
self.connections = set()
self.channels = set()
self.queues = set()
# Queues maps queue name to boolean, indicating if the
# queue and underlying delayed queues are declared.
self.queues = {}
self.state = local()

@property
def is_connected(self):
"""Returns whether a connection is made, does not check for liveness"""
return getattr(self.state, "connection", None) is not None

@property
def connection(self):
"""The :class:`pika.BlockingConnection` for the current
Expand Down Expand Up @@ -187,6 +194,7 @@ def consume(self, queue_name, prefetch=1, timeout=5000):
Returns:
Consumer: A consumer that retrieves messages from RabbitMQ.
"""
self._check_and_declare_queue(queue_name)
return _RabbitmqConsumer(self.parameters, queue_name, prefetch, timeout)

def declare_queue(self, queue_name):
Expand All @@ -195,6 +203,22 @@ def declare_queue(self, queue_name):
Parameters:
queue_name(str): The name of the new queue.
"""
if queue_name not in self.queues:
self.emit_before("declare_queue", queue_name)
self.queues[queue_name] = False
self.emit_after("declare_queue", queue_name)

delayed_name = dq_name(queue_name)
self.delay_queues.add(delayed_name)
self.emit_after("declare_delay_queue", delayed_name)

def _check_and_declare_queue(self, queue_name):
"""Checks if a queue has been registered at RabbitMQ, if not
the queue will be registered
Parameters:
queue_name(str): The name of the queue.
Raises:
ConnectionClosed: If the underlying channel or connection
Expand All @@ -203,18 +227,17 @@ def declare_queue(self, queue_name):
attempts = 1
while True:
try:
if queue_name not in self.queues:
self.emit_before("declare_queue", queue_name)
self._declare_queue(queue_name)
self.queues.add(queue_name)
self.emit_after("declare_queue", queue_name)
# Make sure the queue is declared
self.declare_queue(queue_name)

delayed_name = dq_name(queue_name)
# If the queue hasn't been declared yet, do so now
if not self.queues[queue_name]:
self._declare_queue(queue_name)
self._declare_dq_queue(queue_name)
self.delay_queues.add(delayed_name)
self.emit_after("declare_delay_queue", delayed_name)

self._declare_xq_queue(queue_name)

# Mark as being declared
self.queues[queue_name] = True
break
except (pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPChannelError) as e: # pragma: no cover
Expand Down Expand Up @@ -270,6 +293,8 @@ def enqueue(self, message, *, delay=None):
has been closed.
"""
queue_name = message.queue_name
self._check_and_declare_queue(queue_name)

properties = pika.BasicProperties(
delivery_mode=2,
priority=message.options.get("broker_priority"),
Expand Down Expand Up @@ -322,7 +347,7 @@ def get_declared_queues(self):
set[str]: The names of all the queues declared so far on
this Broker.
"""
return self.queues.copy()
return set(self.queues.keys())

def get_queue_message_counts(self, queue_name):
"""Get the number of messages in a queue. This method is only
Expand Down
33 changes: 22 additions & 11 deletions tests/test_rabbitmq.py
Expand Up @@ -212,6 +212,28 @@ def do_work():
assert xq_count == 1


def test_rabbitmq_broker_doesnot_connect_on():
broker = RabbitmqBroker(
host="127.0.0.1",
max_priority=10,
credentials=RABBITMQ_CREDENTIALS,
)
assert broker.is_connected is False
broker.declare_queue("some-queue")
assert broker.is_connected is False


def test_consume_opens_connection():
broker = RabbitmqBroker(
host="127.0.0.1",
max_priority=10,
credentials=RABBITMQ_CREDENTIALS,
)
assert broker.is_connected is False
broker.consume("test-queue", timeout=1)
assert broker.is_connected is True


def test_rabbitmq_messages_belonging_to_missing_actors_are_rejected(rabbitmq_broker, rabbitmq_worker):
# Given that I have a broker without actors
# If I send it a message
Expand Down Expand Up @@ -418,14 +440,3 @@ def do_work():
assert executed
finally:
worker.stop()


def test_rabbitmq_broker_stops_retrying_declaring_queues_when_max_attempts_reached(rabbitmq_broker):
# Given that I have a rabbit instance that lost its connection
with patch.object(rabbitmq_broker, "_declare_queue", side_effect=pika.exceptions.AMQPConnectionError):
# When I declare an actor
# Then a ConnectionClosed error should be raised
with pytest.raises(dramatiq.errors.ConnectionClosed):
@dramatiq.actor(queue_name="flaky_queue")
def do_work():
pass

0 comments on commit 31318f7

Please sign in to comment.