-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for Kafka headers #881
Comments
How would you migrate a non-header registry topic to a new header-based topic? Create a custom script to "move" over the first 5 bits into the header? I'm not sure I understand the backwards compatibility portion of your comment, because at that point, wouldn't you either be duplicating data or just relying on the existing format? And what other systems are you referring to? AFAIK, most systems supporting Avro require the AVSC schemas, not only raw bytes, and existing deserializer implementations such as Flink, as one example, don't expect or expose message headers in their Kafka events, last time I checked. |
I'd leave this up to the user. This would be trivial with the backward compatibility mode I mentioned. The user would simply read and write messages normally -- messages would be read correctly due to the backward compatibility mode, and written with the new Kafka header as that would be the new default mode of operation.
No. Backward compatibility is only at deserialization time -- if the header exists it is used (no magic in the payload). If the header doesn't exist, the deserializer falls back to reading the magic. There is no duplication of data, just a backward compatible deserializer. Standard deprecation cycle: eventually this gets dropped. I think you'd also need an option on the serializer to continue using magic for now, until the ecosystem catches up... see below -- but I don't think this would be the default mode.
I'm not surprised systems like Flink don't do this yet -- Kafka headers are very new and it will take time for the ecosystem to catch up. However, you seem to be implying that because Flink does not do this now, it never will. I doubt that's the case -- headers are very useful in many situations (see the reasoning in the KIP that introduced them). By other systems, I mean any system that does not rely on the existing serializers/deserializers from Confluent. Currently, to implement such a system, I need to a) just know that the data I'm reading is Avro data with the magic bytes, and b) understand the wire protocol -- I need to grab the bytes, I need to understand they are big-endian (not unusual, but still), I need to know to drop the first byte, and use the next 4 as the schema id. Then I need to read the rest of the message as the actual data. Instead, with headers, I would just see headers indicating its an Avro message and the associated schema id, grab the schema from the registry, and pass it and the payload to my avro deserializer. |
@rocketraman I certainly see value in your proposal. Would you be interested to provide a patch for this? |
Now that the schema registry supports Json via JsonSchema I think this has become very relevant since there is no need for a schema per say to read Json data so the wire format becomes kind of an impediment here. |
The wire format still matters for backwards compatibility and separates plain json messages from schema registry supported json schema (note: last I checked, json schema itself is not finalized} |
I think no one wants to drop the wire format per say, what this suggests is to move the schema id information to the headers with a fallback if it's not there, so as far as I understand this thread, this would be fully "backward compatible". Wrt. to the json schema not being finalised, it seems to be fully part of the OSS confluent distribution, and even if it's not this is a proposal which anyway will require a bit of time to be implemented (for all supported serde frameworks). |
I think it would be nice if the wire format was configurable. I understand this would have a great impact on testing and backwards compatibility. But it would make it 'easy' to move from one format to another, as you can use mirrormaker/replicator and have the consumer and producer configured for different formats. The consumers could default to the current format to be backwards compatible. I think it would be nice if the format using mainly headers was compatible with cloud events https://cloudevents.io/ athough that brings additional challenges since e.g. for the 'id' there no clear way how to get it. |
Any update on this? Apicurio added support for this with version 1.3 (see Apicurio/apicurio-registry#642). Having this feature in the Confluent Schema registry would be really nice so that one can use the CloudEvents |
Would confluent be accepting MR on this issue? The intention would be to provide a configurable/opt in only schema id writer, which would stick by default with the magic-bytes + schema id in the payload and allow the configuration of other implementations to store the ids on the headers (with or without using the cloud events naming). |
Thanks @gustavomonarin , we're considering this. The Java consumer is not the only component that would need to understand this. The Python, golang, .NET and other consumers as well. Additionally, we have front-end Javascript code for our UIs that would need to be enhanced. Also there are certain scenarios in Kafka streams and ksqlDB where headers are not propagated, so we would need to figure out how to address those cases as well. |
What's the vision of confluentic on this issue? Is the plan to support a Thanks |
@woile just a single |
@gklijs not sure if i understood correctly your message. Moving the schema id value from the beginning of the payload would have a huge impact especially for jsonschema. Currently having the id at the beginning of the payload makes json payload quite unconventional to not say unusable and makes a mess out of a transition that could be quite smooth. |
@gustavomonarin please expand why you think this. If you use schema registry, the clients are already taking care of removing those bytes at the beginning. If you don't use schema registry, you can encode which schema you are using in whatever way you like. To transition I think it will be better to move to other topics anyway. That way you are sure about what's there instead of hoping for the best. |
@gklijs There are lots of tools unable to directly integrate with Schema Registry, and it is a hassle to always have the additional authentication configured in every tool. It is a pity that we have to discuss this again, after this issue is open and heavily upvoted for more than five years. |
@albrechtflo-hg So don't use Schema Registry if you can't use a schema registry client for what you are doing. I don't see how this issue will help a lot, except when used with jsonschema, when the payload is easily readable. I'm not sure what authentication has to do with this specific issue? |
@gklijs Primarily, we want to make sure that producers adhere to a published schema when writing messages. Consumers shall also be able to verify that, but that is not a "must". Another scenario: If a developer just quickly wants to inspect messages on a topic, they can use the Offset Explorer software. Yes, that is schema registry aware, but again - why fiddle with all the config stuff if I just want to view the messages on the topic? A developer usually does not care in that moment if the messages are schema-valid or not. If the messages would be plain-text (and, to say that clear, it is simply bad design to have meta information in the message payload if this is no longer required for technical reasons), I would not even have to use Confluent-specific libs to read from this topic. Just as it should be. Reduce complexity where you don't need it. |
You can also use the 'reduce the complexity when you don't need it' to keep things like they are with the challenges mentioned by Robert. If you only care about the production side, why not just have the schema in a separate repo and not use schema registry? I have seen that work great with protobuf. The whole idea is to have a reference to the schema included in the payload (whether in the bytes or the header) so you can always fetch the exact schema used to produce the data if needed. |
We do not care "only" about the production side, and I explicitly added "Consumers shall be able to verify that". Also, please note that we use CloudEvents as a format basis for our messages, and the Confluent schema registry wire format is incompatible to their Kafka specification. And yes, I want to use it together with Schema Registry. That is the whole point of this issue here - improving the product, not determining what is not possible today. |
If this issue is open, anything pro or con is worthwhile. It goes in all sorts of other directions. It seems the advantage of moving things to headers is to make some tools easier to use, which is fair. However, it does make all the schema registry clients more complex as they will need to support both formats, and somehow know if the message is in the old or new format. For this, the magic byte can be used on the consumer side, on the producer side we can add a toggle which format to use. We could also have a header with a key like |
Schema registry should migrate (over a major version, of course) from using a magic value inside the payload to specifying the schema id in a Kafka message header.
This would have the advantage of having the payload be a pure Avro message, which increases interoperability with other systems, and removes the custom wire protocol in favor of the upstream functionality that is now in Kafka.
For backward compatibility, the registry can fall back to magic if the header is not present.
The text was updated successfully, but these errors were encountered: