cdc: support top-level compression option for kafka#166390
cdc: support top-level compression option for kafka#166390techcodie wants to merge 4 commits intocockroachdb:masterfrom
Conversation
The kafka sink historically supported the "Compression" option inside kafka_sink_config, but rejected "compression" as a top-level option. This commit allows using the "compression" top-level option with the kafka sink. It reads the top-level option and applies the correct Codec to the sarama config. Fixes cockroachdb#113495
|
Merging to
|
|
Thank you for contributing to CockroachDB. Please ensure you have followed the guidelines for creating a PR. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
|
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
|
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
…KafkaConfig signature
|
Thank you for updating your pull request. Before a member of our team reviews your PR, I have some potential action items for you:
🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
|
Thanks for the review! I've updated both the v1 (sarama) and v2 (kgo) paths to include a comment clarifying that if both the top-level compression option and the Producer.Compression in kafka_sink_config are set, the top-level option takes precedence. This keeps the behavior consistent with how other sinks prioritize top-level options over sink-specific JSON configurations. I also fixed the buildKafkaConfig signature in the v1 path which was previously referencing sinkOpts without it being in the parameter list. The updated commit has been pushed to the branch. |
Description
The kafka sink historically supported the
"Compression"option inside thekafka_sink_configJSON object, but rejectedcompressionas a top-level option. This created inconsistency and confusion, as documented in #113495.This commit allows using the
compressiontop-level option with the kafka sink. It reads the top-level option (if present) and applies the correct codec to the sarama configuration, making it align with how other sinks receive compression settings.Fixes #113495
Release Notes
compressionoption inCREATE CHANGEFEEDstatements, alongside thekafka_sink_configoption.