Add adaptive backpressure and bound librdkafka internal memory#32
Merged
Add adaptive backpressure and bound librdkafka internal memory#32
Conversation
Proportional batch sizing: consume batch size scales linearly from CONSUME_BATCH_SIZE (buffer empty) to 10 (buffer at flush threshold). Smooths throughput during catchup and traffic spikes. Bound librdkafka memory: queued.max.messages.kbytes=16384 (16MB per partition) prevents librdkafka from pre-fetching unbounded data. This is the actual OOM prevention; batch sizing is throughput smoothing. New metrics: millpond_buffer_fullness, millpond_consume_batch_size_current.
There was a problem hiding this comment.
Pull request overview
Adds adaptive consume backpressure and caps librdkafka’s internal consumer queue to reduce OOM risk during catchup and traffic spikes.
Changes:
- Introduces proportional batch sizing based on pending buffer fullness (
millpond/backpressure.py) and wires it into the main consume loop. - Adds a librdkafka queue memory bound via
queued.max.messages.kbytesand tests for it. - Adds Prometheus gauges for buffer fullness and current adaptive batch size, plus documentation updates.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
millpond/backpressure.py |
New adaptive batch sizing logic + metric emission. |
millpond/main.py |
Uses adaptive batch size when calling consumer.consume(). |
millpond/consumer.py |
Sets queued.max.messages.kbytes to bound internal buffering. |
millpond/metrics.py |
Adds gauges for buffer fullness and current batch size. |
tests/unit/test_backpressure.py |
Unit tests for batch sizing + metric updates. |
tests/unit/test_consumer.py |
Unit test asserting consumer config includes queue bound. |
README.md |
Documents adaptive backpressure behavior and metrics. |
AGENT.md |
Adds design notes for adaptive backpressure and metrics. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Remove unused logger from backpressure.py - Clamp max_batch_size to MIN_BATCH_SIZE in init() - Use setdefault for queued.max.messages.kbytes (allow env override) - Fix README formula to include int() - Align docs: backpressure is throughput smoothing, not OOM prevention - Fix AGENT.md: buffer_fullness can exceed 1.0 - Merge conflict resolution: include offset resume tests from main
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Two complementary mechanisms to prevent OOM during catchup and traffic spikes:
Adaptive batch sizing (
backpressure.py)Consume batch size scales proportionally with buffer fullness:
Buffer empty → consume at full speed. Buffer approaching flush threshold → consume in tiny batches. No state machine, no mode switching. Handles catchup, steady state, and bursts with one formula.
This is a throughput-smoothing mechanism — it controls how fast we dequeue from librdkafka's internal buffer.
librdkafka memory bound (
consumer.py)queued.max.messages.kbytes=16384(16MB per partition). This is the OOM prevention mechanism. Without it, librdkafka pre-fetches up to 64MB per partition regardless of consume rate. With 8 partitions (prod at 64 replicas), that's 512MB of uncontrolled internal buffering. Now capped at 128MB.New metrics
millpond_buffer_fullness— ratio of pending bytes to flush size (0.0 = empty, 1.0 = flush threshold)millpond_consume_batch_size_current— current adaptive batch sizeTest plan