-
Notifications
You must be signed in to change notification settings - Fork 878
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optionally return schemas with AvroProducer #402
Optionally return schemas with AvroProducer #402
Conversation
When calling poll, with_schemas kwarg may be provided to return a proxy for the message. That proxy includes both the key_schema and value_schema as properties. The raw message is also available as a property, and all other calls are delegated to the message.
@confluentinc It looks like @delphyne just signed our Contributor License Agreement. 👍 Always at your service, clabot |
@@ -81,6 +81,45 @@ def produce(self, **kwargs): | |||
super(AvroProducer, self).produce(topic, value, key, **kwargs) | |||
|
|||
|
|||
class AvroMessage(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually becomes fairly expensive on a per message basis. Rather than pulling the message type into python I'd suggest simply setting the additional attributes at runtime.
def decode_message(self, message, schema_attr=None):
...
if not schema_attr is None:
msg.__setattr__(schema_attr, key_schema_obj)
...
This should be fairly cheap although admittedly it takes away from the readability a bit. Proper documentation should be able to bridge the gap here though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it looks like the Message type fails to set the slot for __setattr__
, standby
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah... that's why i had to go this route. i'm open to changing it if you have a better plan though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rnpridgeon do you have updates on approach for this?
Hey gang! Wanted to bump this PR up to get some more eyes on it. I'm currently needing this exact functionality as well, and have subclassed things locally for now to get it working. A more integrated solution is obviously preferable so if I can help out in any way, please let me know! |
How about using the message headers for this? |
Generic Serde support will provide a means to bind your schema with your data. This is similar to java's 'GenericRecord' implementation. #502 |
When calling poll, with_schemas kwarg may be provided to return a proxy for the message. That proxy includes both the key_schema and value_schema as properties. The raw message is also available as a property, and all other calls are delegated to the message.
This PR is particularly helpful along with #401. Together they allow you to push multiple message types to the same topic and to be able to understand what you are getting back when consume from that topic.
Note: #289 suggests making AvroMessage a sub-class of confluent_kafka.cimpl.Message, but I could not find any way to make this work. This version has it delegate with access to the original if desired for performance reasons. In order to prevent this being a breaking change, the AvroMessage version of the message is only returned if schemas are explicitly requested.