[kafka_actions] add value_skip_bytes / key_skip_bytes to strip producer-side prefixes#23556
Conversation
…ialization Adds value_skip_bytes and key_skip_bytes to read_messages config. When non-zero, the deserializer drops that many bytes from the start of the record before doing any further work (raw / json / bson / protobuf / avro decoding, and Confluent Schema Registry magic-byte detection). Use case: producers commonly prepend a fixed-size envelope to the payload — a 1-byte version flag, a 4-byte tenant id, a non-Confluent schema-registry header, an application-level compression marker — that the integration's existing format paths don't recognize. Stripping a known fixed prefix lets the rest decode cleanly without adding a new format-specific code path for every variant. Default 0 (no bytes skipped); negative values and values exceeding the message length return clear deserialization errors instead of truncating silently.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2d7201bb55
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if field_name in info.context['configured_fields']: | ||
| value = getattr(validators, f'instance_{info.field_name}', identity)(value, field=field) | ||
|
|
||
| if info.field_name in SECURE_FIELD_NAMES: | ||
| validation.security.check_field_trusted_provider( | ||
| info.field_name, value, info.context.get('security_config') | ||
| ) | ||
| else: |
There was a problem hiding this comment.
Restore trusted-provider checks for TLS path fields
The _validate branch for configured fields no longer calls validation.security.check_field_trusted_provider, so schema_registry_tls_ca_cert, schema_registry_tls_cert, schema_registry_tls_key, tls_ca_cert, tls_cert, and tls_private_key are now accepted without trusted-provider enforcement. This is a security regression for remote/fleet-configured instances because untrusted config can again supply local file paths for sensitive TLS material that were previously guarded.
Useful? React with 👍 / 👎.
…ance.py The previous commit ran ddev validate models with an older ddev (14.3.0) that doesn't recognize the require_trusted_provider flag in spec.yaml. The regenerated instance.py dropped both the SECURE_FIELD_NAMES set and the validation.security.check_field_trusted_provider call from the field _validate branch, regressing the security enforcement on: schema_registry_tls_ca_cert, schema_registry_tls_cert, schema_registry_tls_key, tls_ca_cert, tls_cert, tls_private_key. For remote/fleet-configured instances, untrusted config could again supply local file paths for sensitive TLS material. Restoring the SECURE_FIELD_NAMES frozenset and the security check matches what master's instance.py looked like before this PR. Spec.yaml already declares require_trusted_provider: true on these fields; only the generated mirror needed restoring.
Apply ruff format to message_deserializer.py and tests, and add the changelog entry required by the PR check.
Validation ReportAll 20 validations passed. Show details
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files🚀 New features to boost your workflow:
|
🎉 All green!❄️ No new flaky tests detected 🎯 Code Coverage (details) 🔗 Commit SHA: dd2192a | Docs | Datadog PR Page | Give us feedback! |
Summary
Adds two new fields on
read_messages:value_skip_bytesandkey_skip_bytes(bothint, default0). When non-zero, the deserializer drops that many leading bytes from the record before applying any other logic — raw / json / bson / protobuf / avro decoding, and Confluent Schema Registry magic-byte detection.Motivation
Producers commonly prepend a fixed-size envelope to the payload that
kafka_actions's existing format paths don't recognize:[0x03][framed-snappy][protobuf])When the prefix length is known and constant, stripping
Nbytes is a one-liner that lets the rest of the record decode cleanly through the existing format paths without adding a per-variant format. For producer-side framings that also compress (e.g. Datadog's V3 with framed Snappy)skip_byteswon't be enough on its own — but it's the natural minimal building block, and a follow-up envelope/decompression knob can layer on top of it.Behavior
skip_bytesis applied before raw / json / bson / protobuf / avro decoding, before Schema Registry magic-byte detection, and before therawbase64 path.skip_bytes < 0returns<deserialization error: skip_bytes must be non-negative, got N>.skip_bytes > len(message)returns<deserialization error: skip_bytes=N exceeds message length M>.skip_bytes == len(message)returnsNone(empty after strip).0is a no-op — the file's existing test suite still passes unchanged.Test plan
tests/test_message_deserializer.py:skip_bytes=0is identical to omitting the paramrawformatNonewhen skipping the entire messageDeserializedMessagewithvalue_skip_bytesset on the value but not the keyvalue_uses_schema_registry=True(skip strips an outer envelope, then SR magic byte is honored)ddev validate models -s kafka_actionsreports models in sync withspec.yaml.Notes
ddev validate config -s kafka_actionsreports pre-existing errors on theschema_registry_tls_*fields aboutrequire_trusted_provider— these reproduce on cleanmasterand are not introduced by this PR.