Skip to content

Commit

Permalink
queue_processors: Set a bounded prefetch size on rabbitmq queues.
Browse files Browse the repository at this point in the history
RabbitMQ clients have a setting called prefetch[1], which controls how
many un-acknowledged events the server forwards to the local queue in
the client.  The default is 0; this means that when clients first
connect, the server must send them every message in the queue.

This itself may cause unbounded memory usage in the client, but also
has other detrimental effects.  While the client is attempting to
process the head of the queue, it may be unable to read from the TCP
socket at the rate that the server is sending to it -- filling the TCP
buffers, and causing the server's writes to block.  If the server
blocks for more than 30 seconds, it times out the send, and closes the
connection with:

```
closing AMQP connection <0.30902.126> (127.0.0.1:53870 -> 127.0.0.1:5672):
{writer,send_failed,{error,timeout}}
```

This is pika/pika#753 (comment).

Set a prefetch limit of 100 messages, or the batch size, to better
handle queues which start with large numbers of outstanding events.

Setting prefetch=1 causes significant performance degradation in the
no-op queue worker, to 30% of the prefetch=0 performance.  Setting
prefetch=100 achieves 90% of the prefetch=0 performance, and higher
values offer only minor gains above that.  For batch workers, their
performance is not notably degraded by prefetch equal to their batch
size, and they cannot function on smaller prefetches than their batch
size.

We also set a 100-count prefetch on Tornado workers, as they are
potentially susceptible to the same effect.

[1] https://www.rabbitmq.com/confirms.html#channel-qos-prefetch
  • Loading branch information
alexmv authored and danielpyon committed Dec 1, 2021
1 parent 5dee2dc commit 5f1984c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
5 changes: 4 additions & 1 deletion zerver/lib/queue.py
Expand Up @@ -252,7 +252,10 @@ class TornadoQueueClient(QueueClient[Channel]):
def __init__(self) -> None:
super().__init__(
# TornadoConnection can process heartbeats, so enable them.
rabbitmq_heartbeat=None
rabbitmq_heartbeat=None,
# Only ask for 100 un-acknowledged messages at once from
# the server, rather than an unbounded number.
prefetch=100,
)
self._on_open_cbs: List[Callable[[Channel], None]] = []
self._connection_failure_count = 0
Expand Down
11 changes: 10 additions & 1 deletion zerver/worker/queue_processors.py
Expand Up @@ -219,6 +219,12 @@ class QueueProcessingWorker(ABC):
CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM = 50
MAX_SECONDS_BEFORE_UPDATE_STATS = 30

# How many un-acknowledged events the worker should have on hand,
# fetched from the rabbitmq server. Larger values may be more
# performant, but if queues are large, cause more network IO at
# startup and steady-state memory.
PREFETCH = 100

def __init__(self) -> None:
self.q: Optional[SimpleQueueClient] = None
if not hasattr(self, "queue_name"):
Expand Down Expand Up @@ -390,7 +396,7 @@ def _handle_consume_exception(self, events: List[Dict[str, Any]], exception: Exc
check_and_send_restart_signal()

def setup(self) -> None:
self.q = SimpleQueueClient()
self.q = SimpleQueueClient(prefetch=self.PREFETCH)

def start(self) -> None:
assert self.q is not None
Expand All @@ -409,6 +415,9 @@ class LoopQueueProcessingWorker(QueueProcessingWorker):
sleep_delay = 1
batch_size = 100

def setup(self) -> None:
self.q = SimpleQueueClient(prefetch=max(self.PREFETCH, self.batch_size))

def start(self) -> None: # nocoverage
assert self.q is not None
self.initialize_statistics()
Expand Down

0 comments on commit 5f1984c

Please sign in to comment.