Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions AGENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,19 @@ The S3 credentials use DuckDB-specific env var names, **not** the standard `AWS_

`aws-msk-iam-sasl-signer-python` is an optional dependency (`pip install millpond[msk-iam]`) but the Dockerfile always installs it (`--extra msk-iam`). All production deployments use IAM auth. The optional dep is for local dev where boto3/botocore (~15MB) may not be wanted.

### Adaptive Backpressure

`backpressure.py` implements proportional batch sizing based on buffer fullness. The consume batch size scales linearly from `CONSUME_BATCH_SIZE` (max, when buffer is empty) down to 10 (min, when buffer is at flush threshold). No state machine, no mode switching — one formula:

```
fullness = pending_bytes / flush_size
batch_size = max(10, int(max_batch * (1.0 - fullness)))
```

This smooths throughput during catchup without manual tuning. During catchup, the buffer fills quickly, batch size drops, flushes happen frequently with small batches. At steady state, the buffer is mostly empty and batch size stays at max. OOM prevention comes from `queued.max.messages.kbytes` (set in consumer.py), which bounds librdkafka's internal fetch buffer per partition.

Metrics: `millpond_buffer_fullness` (0.0 = empty, can exceed 1.0 if buffer overshoots flush threshold) and `millpond_consume_batch_size_current` for monitoring.

### Language: Python

The hot path is all C/C++ (librdkafka, orjson, PyArrow, DuckDB). Python is glue — it touches each record once to pass a parsed dict into a list. Performance bottleneck is S3 write latency, not Python.
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ K8s StatefulSet (N replicas)
- Static partition assignment via pod ordinal — no consumer groups
- If a pod dies, its partitions stop being consumed until K8s restarts it

## Adaptive Backpressure

The consume batch size automatically scales based on how full the pending buffer is relative to the flush threshold. When the buffer is empty, millpond consumes at full speed. As the buffer approaches the flush size, the batch size drops proportionally, smoothing throughput during catchup and traffic spikes. OOM prevention comes from bounding librdkafka's internal fetch buffer via `queued.max.messages.kbytes` (16MB per partition).

```
fullness = pending_bytes / flush_size
batch_size = max(10, int(CONSUME_BATCH_SIZE * (1.0 - fullness)))
```

Metrics: `millpond_buffer_fullness` and `millpond_consume_batch_size_current`.

## Performance

The hot path is all C/C++: librdkafka → orjson → PyArrow → DuckDB (zero-copy Arrow scan). Python is glue.
Expand Down
50 changes: 50 additions & 0 deletions millpond/backpressure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Adaptive batch sizing based on buffer fullness.

Implements proportional backpressure: when the pending buffer approaches
the flush threshold, reduce the consume batch size to smooth throughput.
When the buffer is mostly empty, consume at full speed.

fullness = pending_bytes / flush_size # 0.0 to 1.0+
batch_size = max(MIN, int(MAX * (1.0 - fullness)))

This handles catchup, steady state, and traffic spikes with a single
code path. No state machine, no mode switching.

NOTE: This is a throughput-smoothing mechanism, not OOM prevention.
The actual memory safety knob is librdkafka's `queued.max.messages.kbytes`
(set in consumer.py), which bounds the internal fetch buffer per partition.
Without that, librdkafka pre-fetches up to 64MB per partition regardless
of how slowly we dequeue.
"""

from millpond import metrics

# Minimum batch size — never go below this to avoid per-call overhead domination
MIN_BATCH_SIZE = 10

# The batch size at zero buffer fullness (max throughput)
# Overridden at init from cfg.consume_batch_size
_max_batch_size: int = 1000


def init(max_batch_size: int) -> None:
"""Set the max batch size from config. Called once at startup."""
global _max_batch_size
_max_batch_size = max(max_batch_size, MIN_BATCH_SIZE)


def compute_batch_size(pending_bytes: int, flush_size: int) -> int:
"""Compute the adaptive batch size based on buffer fullness.

Returns a value between MIN_BATCH_SIZE and _max_batch_size.
"""
if flush_size <= 0:
return _max_batch_size

fullness = pending_bytes / flush_size
batch_size = max(MIN_BATCH_SIZE, int(_max_batch_size * (1.0 - fullness)))

metrics.buffer_fullness.set(round(fullness, 3))
metrics.consume_batch_size_current.set(batch_size)

return batch_size
3 changes: 3 additions & 0 deletions millpond/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ def create(cfg: Config) -> Consumer:
"statistics.interval.ms": cfg.stats_interval_ms,
"stats_cb": _on_stats,
}
# Default to 16MB per partition to bound librdkafka internal memory.
# Allow override via KAFKA_CONSUMER_QUEUED_MAX_MESSAGES_KBYTES.
consumer_config.setdefault("queued.max.messages.kbytes", 16384)

consumer = Consumer(consumer_config)

Expand Down
6 changes: 4 additions & 2 deletions millpond/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pyarrow as pa
from confluent_kafka import TopicPartition

from millpond import arrow_converter, config, consumer, ducklake, logging_config, metrics, schema, server
from millpond import arrow_converter, backpressure, config, consumer, ducklake, logging_config, metrics, schema, server

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -132,6 +132,7 @@ def main():
db = ducklake.connect(cfg)
schema_mgr = schema.SchemaManager(db, cfg.ducklake_table)
kafka = consumer.create(cfg)
backpressure.init(cfg.consume_batch_size)
server.health.mark_started()

shutdown = False
Expand All @@ -158,7 +159,8 @@ def on_signal(signum, _frame):
remaining = cfg.flush_interval_s - (time.monotonic() - last_flush)
timeout = max(remaining, 0.1)

msgs = kafka.consume(num_messages=cfg.consume_batch_size, timeout=timeout)
batch_size = backpressure.compute_batch_size(pending_bytes, cfg.flush_size)
msgs = kafka.consume(num_messages=batch_size, timeout=timeout)
if msgs:
server.health.record_poll()
values = []
Expand Down
16 changes: 15 additions & 1 deletion millpond/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ def labels(self, **kwargs):
"Current pending Arrow bytes awaiting flush",
["pipeline"],
)
_buffer_fullness = Gauge(
"millpond_buffer_fullness",
"Ratio of pending bytes to flush size (0.0 = empty, 1.0 = flush threshold)",
["pipeline"],
)
_consume_batch_size_current = Gauge(
"millpond_consume_batch_size_current",
"Current adaptive consume batch size",
["pipeline"],
)
_consumer_lag = Gauge(
"millpond_consumer_lag",
"Highwater mark minus committed offset",
Expand Down Expand Up @@ -129,6 +139,8 @@ def labels(self, **kwargs):
flush_size_bytes = _flush_size_bytes
flush_size_records = _flush_size_records
pending_bytes = _pending_bytes
buffer_fullness = _buffer_fullness
consume_batch_size_current = _consume_batch_size_current
consumer_lag = _consumer_lag
last_committed_offset = _last_committed_offset
schema_columns_added_total = _schema_columns_added_total
Expand All @@ -146,7 +158,7 @@ def init(pipeline: str):
global records_skipped_total, errors_total
global flush_duration_seconds, arrow_conversion_seconds
global flush_size_bytes, flush_size_records
global pending_bytes, consumer_lag, last_committed_offset
global pending_bytes, buffer_fullness, consume_batch_size_current, consumer_lag, last_committed_offset
global schema_columns_added_total, schema_columns_widened_total
global rdkafka_replyq, rdkafka_msg_cnt, rdkafka_msg_size
global rdkafka_broker_rtt_avg, rdkafka_broker_rtt_p99
Expand All @@ -168,6 +180,8 @@ def init(pipeline: str):
flush_size_bytes = _flush_size_bytes.labels(pipeline=pipeline)
flush_size_records = _flush_size_records.labels(pipeline=pipeline)
pending_bytes = _pending_bytes.labels(pipeline=pipeline)
buffer_fullness = _buffer_fullness.labels(pipeline=pipeline)
consume_batch_size_current = _consume_batch_size_current.labels(pipeline=pipeline)
schema_columns_added_total = _schema_columns_added_total.labels(pipeline=pipeline)
schema_columns_widened_total = _schema_columns_widened_total.labels(pipeline=pipeline)
rdkafka_replyq = _rdkafka_replyq.labels(pipeline=pipeline)
Expand Down
92 changes: 92 additions & 0 deletions tests/unit/test_backpressure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from unittest.mock import patch

from millpond.backpressure import MIN_BATCH_SIZE, compute_batch_size, init


class TestComputeBatchSize:
def setup_method(self):
init(1000)

@patch("millpond.backpressure.metrics")
def test_empty_buffer_returns_max(self, mock_metrics):
assert compute_batch_size(0, 100_000_000) == 1000

@patch("millpond.backpressure.metrics")
def test_half_full_returns_half_max(self, mock_metrics):
assert compute_batch_size(50_000_000, 100_000_000) == 500

@patch("millpond.backpressure.metrics")
def test_75_percent_returns_quarter_max(self, mock_metrics):
assert compute_batch_size(75_000_000, 100_000_000) == 250

@patch("millpond.backpressure.metrics")
def test_full_buffer_returns_min(self, mock_metrics):
result = compute_batch_size(100_000_000, 100_000_000)
assert result == MIN_BATCH_SIZE

@patch("millpond.backpressure.metrics")
def test_over_full_returns_min(self, mock_metrics):
result = compute_batch_size(150_000_000, 100_000_000)
assert result == MIN_BATCH_SIZE

@patch("millpond.backpressure.metrics")
def test_near_empty_returns_near_max(self, mock_metrics):
result = compute_batch_size(1_000_000, 100_000_000)
assert result == 990

@patch("millpond.backpressure.metrics")
def test_zero_flush_size_returns_max(self, mock_metrics):
assert compute_batch_size(50_000_000, 0) == 1000

@patch("millpond.backpressure.metrics")
def test_negative_flush_size_returns_max(self, mock_metrics):
assert compute_batch_size(50_000_000, -1) == 1000

@patch("millpond.backpressure.metrics")
def test_never_below_min(self, mock_metrics):
# Even with massive overfill
result = compute_batch_size(999_999_999, 100)
assert result == MIN_BATCH_SIZE

@patch("millpond.backpressure.metrics")
def test_respects_custom_max(self, mock_metrics):
init(500)
assert compute_batch_size(0, 100_000_000) == 500
assert compute_batch_size(50_000_000, 100_000_000) == 250

@patch("millpond.backpressure.metrics")
def test_max_below_min_clamped(self, mock_metrics):
init(5)
assert compute_batch_size(0, 100_000_000) == MIN_BATCH_SIZE

@patch("millpond.backpressure.metrics")
def test_sets_fullness_metric(self, mock_metrics):
compute_batch_size(25_000_000, 100_000_000)
mock_metrics.buffer_fullness.set.assert_called_with(0.25)

@patch("millpond.backpressure.metrics")
def test_sets_batch_size_metric(self, mock_metrics):
result = compute_batch_size(25_000_000, 100_000_000)
mock_metrics.consume_batch_size_current.set.assert_called_with(result)

@patch("millpond.backpressure.metrics")
def test_fullness_rounds_to_3_decimals(self, mock_metrics):
compute_batch_size(33_333_333, 100_000_000)
mock_metrics.buffer_fullness.set.assert_called_with(0.333)

@patch("millpond.backpressure.metrics")
def test_linear_scaling(self, mock_metrics):
"""Verify the response is linear across the range."""
init(1000)
results = []
for pct in range(0, 101, 10):
pending = int(100_000_000 * pct / 100)
results.append(compute_batch_size(pending, 100_000_000))

# Should be monotonically decreasing
for i in range(len(results) - 1):
assert results[i] >= results[i + 1], f"Not monotonic at {i}: {results}"

# First should be max, last should be min
assert results[0] == 1000
assert results[-1] == MIN_BATCH_SIZE
10 changes: 10 additions & 0 deletions tests/unit/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest.mock import MagicMock, patch

import pytest
from confluent_kafka import OFFSET_STORED

from confluent_kafka import OFFSET_STORED

Expand Down Expand Up @@ -316,6 +317,15 @@ def test_sets_client_id(self, mock_discover, mock_consumer_cls):
consumer_config = mock_consumer_cls.call_args[0][0]
assert consumer_config["client.id"] == "millpond-test-topic-events-2"

@patch("millpond.consumer.Consumer")
@patch("millpond.consumer.discover_partition_count", return_value=8)
def test_sets_queued_max_messages_kbytes(self, mock_discover, mock_consumer_cls):
cfg = _make_cfg()
create(cfg)

consumer_config = mock_consumer_cls.call_args[0][0]
assert consumer_config["queued.max.messages.kbytes"] == 16384

@patch("millpond.consumer.Consumer")
@patch("millpond.consumer.discover_partition_count", return_value=8)
def test_assign_uses_stored_offsets(self, mock_discover, mock_consumer_cls):
Expand Down
Loading