[CLI-3630] Support Schema IDs in headers#3163
[CLI-3630] Support Schema IDs in headers#3163Steven Gagniere (sgagniere) merged 33 commits intomainfrom
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
…en the verbosity is set to info
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Pull Request Overview
This PR updates the confluent-kafka-go library to version 2.11.0 to support serializing and deserializing schema IDs in message headers instead of the traditional message prefix. The changes introduce a new --schema-id-header flag for the confluent kafka topic produce command and automatically support header-based schema IDs in the consume command.
Key changes:
- Updated
confluent-kafka-godependency from v2.8.0 to v2.11.0 - Added
--schema-id-headerflag to serialize schema IDs in headers for produce operations - Refactored serialization/deserialization interfaces to support headers and updated all provider implementations
- Consolidated Schema Registry client initialization logic and improved test coverage for header-based schema IDs
Reviewed Changes
Copilot reviewed 23 out of 24 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| go.mod | Updated confluent-kafka-go dependency to v2.11.0 |
| pkg/serdes/serdes.go | Added header support to serialization/deserialization interfaces and consolidated SR client initialization |
| pkg/serdes/*_serialization_provider.go | Updated all serialization providers to return headers alongside serialized data |
| pkg/serdes/*_deserialization_provider.go | Updated all deserialization providers to accept headers and use shared SR client initialization |
| pkg/serdes/serdes_test.go | Added comprehensive test coverage for header-based schema ID serialization/deserialization |
| internal/kafka/command_topic_produce.go | Added --schema-id-header flag support and updated message serialization to include headers |
| internal/kafka/confluent_kafka.go | Refactored consume logic to support header-based schema IDs and removed legacy RequestSchema method |
| internal/kafka/confluent_kafka_configs.go | Enhanced debug logging configuration for producer/consumer |
| internal/asyncapi/command_export.go | Updated to use new deserialization interface with headers |
| test/fixtures/output/kafka/topic/*.golden | Updated test fixtures for help text and error messages |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| @@ -67,7 +45,6 @@ func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, | |||
| } | |||
|
|
|||
| deser, err := avrov2.NewDeserializer(serdeClient, serdeType, serdeConfig) | |||
There was a problem hiding this comment.
[nitpick] Removed blank line between the deserializer creation and error check. While this works, the previous formatting with the blank line was more consistent with the style used elsewhere in the codebase.
| deser, err := avrov2.NewDeserializer(serdeClient, serdeType, serdeConfig) | |
| deser, err := avrov2.NewDeserializer(serdeClient, serdeType, serdeConfig) |
|
|
As a precautionary sanity check, can we add one more manual test case that produces and consumes a plain string message (no schema involved)? |
|
Channing Dong (channingdong)
left a comment
There was a problem hiding this comment.
LGTM


Release Notes
Breaking Changes
New Features
confluent kafka topic consumenow supports consuming messages with schema IDs serialized in the headerBug Fixes
Checklist
Whatsection below whether this PR applies to Confluent Cloud, Confluent Platform, or both.Test & Reviewsection below.Blast Radiussection below.What
Support schema IDs in headers for
confluent kafka topic [produce | consume]for both Cloud and On-prem.Blast Radius
The update to the functionality is additive, but the implementation required changes to the serdes package. So in the worst case bugs could impact the deserialization of json/avro/protobuf messages in
kafka topic consumeandasyncapi export.References
Test & Review
Manual testing: https://docs.google.com/document/d/13e62TwsqCSwT6FU587Gm03Vz_VNFP4c9XrCw2anK8oI/edit?usp=sharing