Releases: Vadimus1983/kafko
Releases · Vadimus1983/kafko
v0.3.0 — Multi-partition topics + resumable consumer groups
Multi-partition topics with key-based routing. A topic can now own N partitions,
each with its own writer task and log, so producers write in parallel. Records
route by key (hash(key) % partitions) so same-key records keep their order;
keyless records spread round-robin. A single consumer reads all partitions merged
into one stream. Ordering is guaranteed within a partition; cross-partition
order is intentionally undefined — the standard Kafka contract. Default partition
count is 1, so single-partition topics behave as before.
Added
- Resumable consumers via committed offsets (consumer groups, part 1).
Kafko::consumer_for_group(topic, group)returns a consumer that resumes from
the group's durably committed position instead of offset 0 — the
"continue where it stopped after a restart" behaviour.Consumer::commit()
persists the current per-partition read position (atomic temp + fsync + rename
to<topic>/offsets/<group>, CRC-framed);Consumer::committed(partition)and
Consumer::group()introspect. Distinct groups on a topic keep independent
positions (durable pub/sub fan-out). A torn/corrupt offset file degrades to
"start from 0", never an error.consumer_forstays anonymous (reads from 0;
commit()is a no-op). New errorKafkoError::InvalidGroupName.
Single active consumer per group for now; multi-member partition assignment +
rebalancing is a later slice. Kafko::create_topic_with_partitions(name, count)and
create_topic_with_config_and_partitions(name, cfg, count)— create a topic
withcountpartitions.count == 0returnsKafkoError::InvalidPartitionCount.Kafko::partition_count(name) -> Option<u32>.Topic— public type owning a topic's partitions and its routing (FNV-1a key
hash + round-robin). Obtained viaKafko::topic.RecordPosition { partition, offset }— returned by allProducersend
methods.Producer::send_to(partition, key, value)for explicit partition targeting,
andProducer::partition_count().Consumer::next_with_position()(record + itsRecordPosition),
from_topic/from_topic_at,seek_all(offset),seek(partition, offset),
position(partition),partition_count().
Changed (breaking)
Producer::send/send_recordnow returnRecordPositioninstead of a
bareu64offset (an offset is only meaningful within a partition).send_batch
/send_batch_recordsreturnVec<RecordPosition>. Read.offset()/
.partition()on the result.send_batchatomicity is now per-partition. A batch whose records route to
different partitions is atomic within each partition, not across them. For a
single-partition topic this is unchanged (one fully-atomic append).- On-disk layout changed to
<dir>/<topic>/<partition>/<segments>. Data
directories written by kafko <= 0.2 (segments directly under the topic dir) will
not open —Kafko::openreturnsKafkoError::InvalidTopicLayout. There is no
automatic migration. Kafko::topicreturnsArc<Topic>(wasArc<Partition>).Producer::newtakesArc<Topic>;Consumeris built from aTopic
(from_topic/from_topic_at) andConsumer::seeknow takes a partition
index (useseek_allfor the old whole-stream behaviour).
v0.2.0
Performance
- LZ4 hot-path allocation reduced by ~99.997%.
Compression::Lz4's encode
path now amortizes its internal hash table across calls via lz4_flex 0.13's
newcompress_into_with_tableAPI and a per-threadCompressTable. The
per-record 8 KiB hash-table allocation that dominated heap traffic on
LZ4-heavy workloads (~1.2 GiB cumulative across 300 K records on lz4_flex
0.11) is now amortized to a single 8 KiB allocation per encoder thread for
the process lifetime. Measured: 24.9 KiB total across 100 000 LZ4 sends
in the kafko-benchlz4_sequentialscenario (0.10% of total process
allocation, down from 93%). LZ4 sequential throughput now tracks no-codec
sequential throughput within 3% (164 K vs 160 K rec/s on the same path).
Added
- Cargo features for compression codecs. Two new opt-in features —
compression-lz4andcompression-zstd, plus the convenience
compression-all— makelz4_flexandzstdoptional dependencies. A
defaultcargo add kafkono longer pulls either codec into the dep tree.
Compression::Lz4andCompression::Zstdremain visible in the public API
regardless of feature flags so on-disk records written by another build are
detectable and produce a friendlyKafkoError::CompressionUnavailable
rather than mis-decoding. KafkoError::CompressionUnavailable(Compression)— error variant returned
when encoding or decoding a record under a codec whose Cargo feature is not
enabled. Display message names the missing feature.Compression::is_available() -> bool— runtime feature detection for
callers that want to fall back gracefully between codecs.kafko-benchlz4_sequentialscenario — mirrors thesequentialscenario
underCompression::Lz4so the hotpath alloc table can attribute
compression::compressheap traffic to the LZ4 path specifically. Gated
behind thecompression-lz4feature.[package.metadata.docs.rs]— docs.rs builds withall-featuresso both
codec variants are documented in full on the rendered crate page.
Changed (breaking)
Record::encode_withsignature. Previously-> usize; now
-> Result<usize>. The error case isKafkoError::CompressionUnavailable,
returned when encoding under a codec whose feature is not enabled. Callers
usingProducer::send/Producer::send_batchare unaffected — the
Producer API was alreadyResult-returning and propagates the new error
naturally. Callers usingRecord::encode_withdirectly must add?or
.unwrap().Record::encode()(theCompression::Noneconvenience) is
unchanged.- Default Cargo features. Previously
lz4_flexandzstdwere
unconditional dependencies; now both are gated behind opt-in features. A
cargo update kafkofrom0.1.1to0.2.0for a downstream pinned at
^0.1will silently drop LZ4/Zstd support until features are added.
To preserve v0.1.1 behaviour, change the dependency to:kafko = { version = "0.2", features = ["compression-all"] }
Internal
lz4_flexbumped from 0.11 to 0.13. The 0.11 line is in maintenance mode;
0.13 ships the same security fixes as 0.11.6 plus the
compress_into_with_tableAPI that enables the per-thread hash-table reuse
win above. Wire format unchanged; existing on-disk segments read back
identically.- New
LZ4_TABLE: RefCell<lz4_flex::block::CompressTable>thread-local in
crates/kafko/src/compression.rsalongside the existing
ZSTD_COMPRESSOR/ZSTD_DECOMPRESSORthread-locals.
v0.1.1 — send_batch + LZ4 OOM fix
Security
- Fixed: LZ4 decompress OOM (DoS). Records with
Compression::Lz4whose
payload had an adversarial 4-byte LE size prefix could coerce
lz4_flex::decompress_size_prependedinto a ~4 GiBVec::with_capacity
call before the compressed bytes were validated, OOM-crashing the host
process. Triggerable by any read of a corrupt or adversarial segment file.
Fixed by bounding the claimed decompressed size at 16 MiB before delegating
tolz4_flex(matching the existing Zstd cap). Discovered via the new
cargo-fuzzdecode_record_structuredtarget. v0.1.0 is yanked.
Added
Producer::send_batch(Vec<(Option<Bytes>, Bytes)>)and
Producer::send_batch_records(Vec<Record>)— atomic, single-mpsc-round-trip
batched appends. ~10× throughput vs a loop ofsend()at N = 1024
(2.53 M rec/s vs 252 K rec/s, criterion-measured, 256 B records, in-process).Partition::append_batch(Vec<Record>) -> Result<Vec<u64>>— the actor
primitive underlyingProducer::send_batch.- Two new criterion benches:
send_batch(batch-size × codec sweep),
config_sweep(segment_size_threshold/index_interval/ preset configs). - Cargo-fuzz scaffold under
fuzz/with five targets:decode_record,
decode_record_structured,recovery_torn_tail,sparse_index_parse,
log_op_sequence. WSL2 setup documented in the README. - Proptest coverage expanded to all three compression codecs, the
SparseIndex::lookupinvariant,Logappend-then-read invariant, and
Logrotation under randomLogConfig. - "Performance recipes — pick once, ship it" section in both READMEs:
decision table + four code recipes + "what NOT to tune" guidance.
Fixed
Segment::would_overflowno longer reports overflow on an empty segment.
Previously the first append (or batch) whose wire size exceeded
segment_size_thresholdagainst a fresh empty log triggered rotation,
which then tried to create a new segment atnext_offset = 0and failed
withErrorKind::AlreadyExists. The size cap is now correctly a soft
target on segments with content; empty segments always accept the first
write. Reachable under default config only at > 1 GiB single records;
reachable under any custom small-threshold config.
Changed
- Internal constant
ZSTD_DECOMPRESS_MAX_SIZErenamed toDECOMPRESS_MAX_SIZE
and now applies uniformly to LZ4 and Zstd. - README:
Producer::send_batchmoved from v0.2 roadmap to v0.1 list. The
kafko-http/produce_batchbullet was dropped from the roadmap — it's
an improvement for the measurement harness, not the library.