Skip to content

Kafka connect shredding#16369

Closed
soumilshah1995 wants to merge 3 commits into
apache:mainfrom
soumilshah1995:kafka-connect-shredding
Closed

Kafka connect shredding#16369
soumilshah1995 wants to merge 3 commits into
apache:mainfrom
soumilshah1995:kafka-connect-shredding

Conversation

@soumilshah1995
Copy link
Copy Markdown

No description provided.

soumilshah199500 and others added 3 commits May 15, 2026 09:06
This commit adds automatic variant shredding support for Kafka Connect
and other tools using generic Record types. Variant shredding optimizes
Parquet storage by extracting frequently-occurring fields from variant
data into typed columns for better compression and query performance.

Key Changes:
- Add RecordVariantShreddingAnalyzer for generic Record types
- Update GenericFormatModels to enable variant shredding with buffering
- Add comprehensive documentation for configuration and usage

Implementation Details:
- Buffers first N rows (default: 100) for schema analysis
- Automatically selects fields appearing in ≥10% of rows
- Limits shredding to 300 fields and 50 nesting levels
- Uses GenericRecord::copy for safe row buffering

Configuration:
- Enable: write.parquet.shred-variants=true
- Buffer size: write.parquet.variant-inference-buffer-size=100

Benefits:
- Better Parquet compression through typed columns
- Faster queries with predicate pushdown
- Lower storage costs
- Zero code changes required in Kafka Connect

Memory Considerations:
- Memory usage: buffer_size × concurrent_writers × row_size
- Recommended: Keep buffer small (100-500) for production
- Increase target file size to reduce concurrent writers

Limitations:
- No explicit field control (automatic selection only)
- Per-file analysis (schemas may vary across files)
- Requires buffering (adds latency for first N rows)

Future Enhancements:
- Schema hints for zero-buffering production deployments
- Global sampling with schema caching
- Field whitelisting/blacklisting

See KAFKA_CONNECT_VARIANT_SHREDDING.md for detailed documentation.

Co-authored-by: Cursor <cursoragent@cursor.com>
Register RecordVariantShreddingAnalyzer and Record::copy on the generic
Parquet format model, and resolve VARIANT column indices from Iceberg
schema order so analysis works with Void engine schemas (Connect path).

Co-authored-by: Cursor <cursoragent@cursor.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants