Changelog
All notable changes to aiokpl are documented here.
Format: Keep a Changelog.
Versioning: SemVer. Pre-1.0 the
public API may change between minor versions; the on-the-wire KPL
aggregation format is frozen.
Unreleased
0.2.0 — 2026-05-21
First public release.
Added
- Producer (
async with Producer(config) as producer:) wiring the full
pipeline:Aggregator→Limiter→Collector→Sender→Retrier.
Per-stream pipelines lazily created, backpressure via
max_outstanding_recordssemaphore, graceful shutdown via
flush()+__aexit__. SyncProducerfor callers not running an async event loop. Spawns an
internalanyio.from_thread.BlockingPortal; thread-safe;wait(timeout=)
andflush(timeout=)raiseTimeoutErroron elapse.Outcome[T]— backend-agnostic one-shot future replacing
asyncio.Futureso the producer works on bothasyncioandtrio.- Byte-exact KPL aggregation codec (
encode_aggregated/
decode_aggregated) with hand-rolled protobuf — zero dependency on the
protobufpackage. ShardMapwithbisect_leftprediction over a cached, async-refreshed
list of shards. State machineINVALID → UPDATING → READYwith
invalidate()semantics, exponential backoff (1 s → 30 s), and
closed-shard cleanup TTL.Reducer[I, B]generic deadline-driven batcher with FIFO-by-deadline
packing and excess re-injection.Aggregatorproducing per-shard aggregated batches; falls back to
single-record mode when the shard map is not ready or aggregation is
disabled.Collectorwith 500-record / 5-MiB / 256-KiB-per-shard short-circuit.Limiter+TokenBucket— multi-stream token bucket (records/s +
bytes/s) with a 25 ms drain loop; per-shard isolation;Expiredroute
for records that age past their TTL.Senderwrappingaiobotocorefor thePutRecordscall. Captures
per-record outcomes and surfaces request-level failures uniformly.Retrierwith the full classification table: throttle / transient /
wrong-shard (split-aware) / expired. Per-recordAttempthistory visible
to the user viaRecordResult.attempts.- Vendor-neutral metrics via the
MetricsSinkProtocol. First-party
sinks:NullSink(default, zero overhead),InMemorySink,
CloudWatchSink. Optional sinks behind extras:OpenTelemetrySink
(aiokpl[otel]),DatadogSink(aiokpl[datadog]). anyioas the async runtime — works on bothasyncioandtrio
for the non-network stages. Network layer (Sender,Retrier,
CloudWatchSink) is asyncio-only becauseaiobotocoreis asyncio-only.
Tested
- 480 unit tests, every async test parametrized across both
asyncioand
triobackends. - 20 integration tests against
etspaceman/kinesis-mock(byte-exact
hash-key routing, paginatedListShards, split-shard children, retry
paths, backpressure, end-to-end aggregation roundtrip). - 100 % line and branch coverage on the
aiokpl/package. ruff check,ruff format,ty check, andmkdocs build --strictall
clean.