diff --git a/AGENT.md b/AGENT.md index 1e88712..748660d 100644 --- a/AGENT.md +++ b/AGENT.md @@ -91,6 +91,19 @@ The S3 credentials use DuckDB-specific env var names, **not** the standard `AWS_ `aws-msk-iam-sasl-signer-python` is an optional dependency (`pip install millpond[msk-iam]`) but the Dockerfile always installs it (`--extra msk-iam`). All production deployments use IAM auth. The optional dep is for local dev where boto3/botocore (~15MB) may not be wanted. +### Adaptive Backpressure + +`backpressure.py` implements proportional batch sizing based on buffer fullness. The consume batch size scales linearly from `CONSUME_BATCH_SIZE` (max, when buffer is empty) down to 10 (min, when buffer is at flush threshold). No state machine, no mode switching — one formula: + +``` +fullness = pending_bytes / flush_size +batch_size = max(10, int(max_batch * (1.0 - fullness))) +``` + +This smooths throughput during catchup without manual tuning. During catchup, the buffer fills quickly, batch size drops, flushes happen frequently with small batches. At steady state, the buffer is mostly empty and batch size stays at max. OOM prevention comes from `queued.max.messages.kbytes` (set in consumer.py), which bounds librdkafka's internal fetch buffer per partition. + +Metrics: `millpond_buffer_fullness` (0.0 = empty, can exceed 1.0 if buffer overshoots flush threshold) and `millpond_consume_batch_size_current` for monitoring. + ### Language: Python The hot path is all C/C++ (librdkafka, orjson, PyArrow, DuckDB). Python is glue — it touches each record once to pass a parsed dict into a list. Performance bottleneck is S3 write latency, not Python. diff --git a/README.md b/README.md index a0edb9a..f4bb3ce 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,17 @@ K8s StatefulSet (N replicas) - Static partition assignment via pod ordinal — no consumer groups - If a pod dies, its partitions stop being consumed until K8s restarts it +## Adaptive Backpressure + +The consume batch size automatically scales based on how full the pending buffer is relative to the flush threshold. When the buffer is empty, millpond consumes at full speed. As the buffer approaches the flush size, the batch size drops proportionally, smoothing throughput during catchup and traffic spikes. OOM prevention comes from bounding librdkafka's internal fetch buffer via `queued.max.messages.kbytes` (16MB per partition). + +``` +fullness = pending_bytes / flush_size +batch_size = max(10, int(CONSUME_BATCH_SIZE * (1.0 - fullness))) +``` + +Metrics: `millpond_buffer_fullness` and `millpond_consume_batch_size_current`. + ## Performance The hot path is all C/C++: librdkafka → orjson → PyArrow → DuckDB (zero-copy Arrow scan). Python is glue. diff --git a/millpond/backpressure.py b/millpond/backpressure.py new file mode 100644 index 0000000..f65ed26 --- /dev/null +++ b/millpond/backpressure.py @@ -0,0 +1,50 @@ +"""Adaptive batch sizing based on buffer fullness. + +Implements proportional backpressure: when the pending buffer approaches +the flush threshold, reduce the consume batch size to smooth throughput. +When the buffer is mostly empty, consume at full speed. + + fullness = pending_bytes / flush_size # 0.0 to 1.0+ + batch_size = max(MIN, int(MAX * (1.0 - fullness))) + +This handles catchup, steady state, and traffic spikes with a single +code path. No state machine, no mode switching. + +NOTE: This is a throughput-smoothing mechanism, not OOM prevention. +The actual memory safety knob is librdkafka's `queued.max.messages.kbytes` +(set in consumer.py), which bounds the internal fetch buffer per partition. +Without that, librdkafka pre-fetches up to 64MB per partition regardless +of how slowly we dequeue. +""" + +from millpond import metrics + +# Minimum batch size — never go below this to avoid per-call overhead domination +MIN_BATCH_SIZE = 10 + +# The batch size at zero buffer fullness (max throughput) +# Overridden at init from cfg.consume_batch_size +_max_batch_size: int = 1000 + + +def init(max_batch_size: int) -> None: + """Set the max batch size from config. Called once at startup.""" + global _max_batch_size + _max_batch_size = max(max_batch_size, MIN_BATCH_SIZE) + + +def compute_batch_size(pending_bytes: int, flush_size: int) -> int: + """Compute the adaptive batch size based on buffer fullness. + + Returns a value between MIN_BATCH_SIZE and _max_batch_size. + """ + if flush_size <= 0: + return _max_batch_size + + fullness = pending_bytes / flush_size + batch_size = max(MIN_BATCH_SIZE, int(_max_batch_size * (1.0 - fullness))) + + metrics.buffer_fullness.set(round(fullness, 3)) + metrics.consume_batch_size_current.set(batch_size) + + return batch_size diff --git a/millpond/consumer.py b/millpond/consumer.py index c8c2b5b..52e70f6 100644 --- a/millpond/consumer.py +++ b/millpond/consumer.py @@ -133,6 +133,9 @@ def create(cfg: Config) -> Consumer: "statistics.interval.ms": cfg.stats_interval_ms, "stats_cb": _on_stats, } + # Default to 16MB per partition to bound librdkafka internal memory. + # Allow override via KAFKA_CONSUMER_QUEUED_MAX_MESSAGES_KBYTES. + consumer_config.setdefault("queued.max.messages.kbytes", 16384) consumer = Consumer(consumer_config) diff --git a/millpond/main.py b/millpond/main.py index 8a633fb..503edb2 100644 --- a/millpond/main.py +++ b/millpond/main.py @@ -6,7 +6,7 @@ import pyarrow as pa from confluent_kafka import TopicPartition -from millpond import arrow_converter, config, consumer, ducklake, logging_config, metrics, schema, server +from millpond import arrow_converter, backpressure, config, consumer, ducklake, logging_config, metrics, schema, server log = logging.getLogger(__name__) @@ -132,6 +132,7 @@ def main(): db = ducklake.connect(cfg) schema_mgr = schema.SchemaManager(db, cfg.ducklake_table) kafka = consumer.create(cfg) + backpressure.init(cfg.consume_batch_size) server.health.mark_started() shutdown = False @@ -158,7 +159,8 @@ def on_signal(signum, _frame): remaining = cfg.flush_interval_s - (time.monotonic() - last_flush) timeout = max(remaining, 0.1) - msgs = kafka.consume(num_messages=cfg.consume_batch_size, timeout=timeout) + batch_size = backpressure.compute_batch_size(pending_bytes, cfg.flush_size) + msgs = kafka.consume(num_messages=batch_size, timeout=timeout) if msgs: server.health.record_poll() values = [] diff --git a/millpond/metrics.py b/millpond/metrics.py index d09c660..3b49f41 100644 --- a/millpond/metrics.py +++ b/millpond/metrics.py @@ -68,6 +68,16 @@ def labels(self, **kwargs): "Current pending Arrow bytes awaiting flush", ["pipeline"], ) +_buffer_fullness = Gauge( + "millpond_buffer_fullness", + "Ratio of pending bytes to flush size (0.0 = empty, 1.0 = flush threshold)", + ["pipeline"], +) +_consume_batch_size_current = Gauge( + "millpond_consume_batch_size_current", + "Current adaptive consume batch size", + ["pipeline"], +) _consumer_lag = Gauge( "millpond_consumer_lag", "Highwater mark minus committed offset", @@ -129,6 +139,8 @@ def labels(self, **kwargs): flush_size_bytes = _flush_size_bytes flush_size_records = _flush_size_records pending_bytes = _pending_bytes +buffer_fullness = _buffer_fullness +consume_batch_size_current = _consume_batch_size_current consumer_lag = _consumer_lag last_committed_offset = _last_committed_offset schema_columns_added_total = _schema_columns_added_total @@ -146,7 +158,7 @@ def init(pipeline: str): global records_skipped_total, errors_total global flush_duration_seconds, arrow_conversion_seconds global flush_size_bytes, flush_size_records - global pending_bytes, consumer_lag, last_committed_offset + global pending_bytes, buffer_fullness, consume_batch_size_current, consumer_lag, last_committed_offset global schema_columns_added_total, schema_columns_widened_total global rdkafka_replyq, rdkafka_msg_cnt, rdkafka_msg_size global rdkafka_broker_rtt_avg, rdkafka_broker_rtt_p99 @@ -168,6 +180,8 @@ def init(pipeline: str): flush_size_bytes = _flush_size_bytes.labels(pipeline=pipeline) flush_size_records = _flush_size_records.labels(pipeline=pipeline) pending_bytes = _pending_bytes.labels(pipeline=pipeline) + buffer_fullness = _buffer_fullness.labels(pipeline=pipeline) + consume_batch_size_current = _consume_batch_size_current.labels(pipeline=pipeline) schema_columns_added_total = _schema_columns_added_total.labels(pipeline=pipeline) schema_columns_widened_total = _schema_columns_widened_total.labels(pipeline=pipeline) rdkafka_replyq = _rdkafka_replyq.labels(pipeline=pipeline) diff --git a/tests/unit/test_backpressure.py b/tests/unit/test_backpressure.py new file mode 100644 index 0000000..d0dae2b --- /dev/null +++ b/tests/unit/test_backpressure.py @@ -0,0 +1,92 @@ +from unittest.mock import patch + +from millpond.backpressure import MIN_BATCH_SIZE, compute_batch_size, init + + +class TestComputeBatchSize: + def setup_method(self): + init(1000) + + @patch("millpond.backpressure.metrics") + def test_empty_buffer_returns_max(self, mock_metrics): + assert compute_batch_size(0, 100_000_000) == 1000 + + @patch("millpond.backpressure.metrics") + def test_half_full_returns_half_max(self, mock_metrics): + assert compute_batch_size(50_000_000, 100_000_000) == 500 + + @patch("millpond.backpressure.metrics") + def test_75_percent_returns_quarter_max(self, mock_metrics): + assert compute_batch_size(75_000_000, 100_000_000) == 250 + + @patch("millpond.backpressure.metrics") + def test_full_buffer_returns_min(self, mock_metrics): + result = compute_batch_size(100_000_000, 100_000_000) + assert result == MIN_BATCH_SIZE + + @patch("millpond.backpressure.metrics") + def test_over_full_returns_min(self, mock_metrics): + result = compute_batch_size(150_000_000, 100_000_000) + assert result == MIN_BATCH_SIZE + + @patch("millpond.backpressure.metrics") + def test_near_empty_returns_near_max(self, mock_metrics): + result = compute_batch_size(1_000_000, 100_000_000) + assert result == 990 + + @patch("millpond.backpressure.metrics") + def test_zero_flush_size_returns_max(self, mock_metrics): + assert compute_batch_size(50_000_000, 0) == 1000 + + @patch("millpond.backpressure.metrics") + def test_negative_flush_size_returns_max(self, mock_metrics): + assert compute_batch_size(50_000_000, -1) == 1000 + + @patch("millpond.backpressure.metrics") + def test_never_below_min(self, mock_metrics): + # Even with massive overfill + result = compute_batch_size(999_999_999, 100) + assert result == MIN_BATCH_SIZE + + @patch("millpond.backpressure.metrics") + def test_respects_custom_max(self, mock_metrics): + init(500) + assert compute_batch_size(0, 100_000_000) == 500 + assert compute_batch_size(50_000_000, 100_000_000) == 250 + + @patch("millpond.backpressure.metrics") + def test_max_below_min_clamped(self, mock_metrics): + init(5) + assert compute_batch_size(0, 100_000_000) == MIN_BATCH_SIZE + + @patch("millpond.backpressure.metrics") + def test_sets_fullness_metric(self, mock_metrics): + compute_batch_size(25_000_000, 100_000_000) + mock_metrics.buffer_fullness.set.assert_called_with(0.25) + + @patch("millpond.backpressure.metrics") + def test_sets_batch_size_metric(self, mock_metrics): + result = compute_batch_size(25_000_000, 100_000_000) + mock_metrics.consume_batch_size_current.set.assert_called_with(result) + + @patch("millpond.backpressure.metrics") + def test_fullness_rounds_to_3_decimals(self, mock_metrics): + compute_batch_size(33_333_333, 100_000_000) + mock_metrics.buffer_fullness.set.assert_called_with(0.333) + + @patch("millpond.backpressure.metrics") + def test_linear_scaling(self, mock_metrics): + """Verify the response is linear across the range.""" + init(1000) + results = [] + for pct in range(0, 101, 10): + pending = int(100_000_000 * pct / 100) + results.append(compute_batch_size(pending, 100_000_000)) + + # Should be monotonically decreasing + for i in range(len(results) - 1): + assert results[i] >= results[i + 1], f"Not monotonic at {i}: {results}" + + # First should be max, last should be min + assert results[0] == 1000 + assert results[-1] == MIN_BATCH_SIZE diff --git a/tests/unit/test_consumer.py b/tests/unit/test_consumer.py index 109bdc1..356d983 100644 --- a/tests/unit/test_consumer.py +++ b/tests/unit/test_consumer.py @@ -2,6 +2,7 @@ from unittest.mock import MagicMock, patch import pytest +from confluent_kafka import OFFSET_STORED from confluent_kafka import OFFSET_STORED @@ -316,6 +317,15 @@ def test_sets_client_id(self, mock_discover, mock_consumer_cls): consumer_config = mock_consumer_cls.call_args[0][0] assert consumer_config["client.id"] == "millpond-test-topic-events-2" + @patch("millpond.consumer.Consumer") + @patch("millpond.consumer.discover_partition_count", return_value=8) + def test_sets_queued_max_messages_kbytes(self, mock_discover, mock_consumer_cls): + cfg = _make_cfg() + create(cfg) + + consumer_config = mock_consumer_cls.call_args[0][0] + assert consumer_config["queued.max.messages.kbytes"] == 16384 + @patch("millpond.consumer.Consumer") @patch("millpond.consumer.discover_partition_count", return_value=8) def test_assign_uses_stored_offsets(self, mock_discover, mock_consumer_cls):