[kafka_actions] Add plugin architecture for formats and compression codecs#23650
[kafka_actions] Add plugin architecture for formats and compression codecs#23650piochelepiotr wants to merge 3 commits intomasterfrom
Conversation
…odecs Introduces a pluggable interface for message format handlers and app-level payload compression codecs, discovered via the 'datadog_kafka_actions.formats' and 'datadog_kafka_actions.compressions' entry-point groups. Existing built-in formats (json, string, raw, bson, avro, protobuf) are now first-class plugins registered by the wheel itself. New 'value_compression' and 'key_compression' config keys decompress payloads before deserialization. No compression codecs ship in core; install a plugin wheel (e.g. datadog-kafka-deserializers from integrations-extras) to add them. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 81ca3f7783
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| 'value_compression': config.get('value_compression'), | ||
| 'key_compression': config.get('key_compression'), |
There was a problem hiding this comment.
Add compression keys to action configuration schema
Wire-up in read_messages now reads value_compression/key_compression, but this commit does not add those fields to the integration configuration contract (kafka_actions/assets/configuration/spec.yaml) or generated model (kafka_actions/datadog_checks/kafka_actions/config_models/instance.py). In Remote Configuration flows, unsupported keys are rejected or dropped before reaching the check, so the new compression feature is effectively not configurable in production even though deserialization now expects it.
Useful? React with 👍 / 👎.
Codecov Report❌ Patch coverage is Additional details and impacted files🚀 New features to boost your workflow:
|
🎉 All green!❄️ No new flaky tests detected 🎯 Code Coverage (details) 🔗 Commit SHA: 371cd79 | Docs | Datadog PR Page | Give us feedback! |
- Use 2026-present in license headers to match repo standard - Revert __about__.py to 2.6.0; version bump is handled at release time by towncrier consuming changelog.d/ entries Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Validation ReportAll 20 validations passed. Show details
|
What does this PR do?
Adds a pluggable interface for message format handlers and app-level payload compression codecs to the
kafka_actionscheck, discovered via Python entry-point groups:datadog_kafka_actions.formats—FormatHandlersubclasses (one per message format)datadog_kafka_actions.compressions—CompressionCodecsubclasses (one per compression algorithm)Built-in formats (
json,string,raw,bson,avro,protobuf) are now first-class plugins registered by this wheel via the same entry points — no special-cased dispatch inMessageDeserializer.Two new config keys,
value_compressionandkey_compression, decompress payloads before format dispatch. No compression codecs ship in core; install a plugin wheel to add them. The companion plugin wheeldatadog-kafka-deserializers(separate PR in integrations-extras) providesmsgpackplusgzip/zlib/snappy/lz4/lz4_dd_hdr/zstd.Motivation
A growing set of Kafka producers across
dd-goanddd-sourceship payloads in formats and codecs that the check doesn't currently support — most notably MessagePack and a handful of app-level compressions (gzip,zstd, and a customlz4framing used by xray-converter). Hard-coding more handlers into core makes each new format akafka_actionschange with a full release cycle, and pulls dependency churn (msgpack,lz4,zstandard,python-snappy, ...) into a check that doesn't otherwise need them.The plugin model lets us:
Additional Notes
MessageDeserializer's public surface is unchanged. Existing callers and tests continue to work._bootstrap_format_handlers()direct-registers the built-in handlers on import so the check works in source-mode test runs withoutpip install.skip_bytes, so existingskip_bytessemantics are preserved.plugin-architecture.added; will rename to<PR#>.addedonce this PR has a number.Review checklist (to be filled by the PR author)
changelog/andintegration/labels attached🤖 Generated with Claude Code