Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AvroProducer and AvroConsumer without key schema #608

Closed
1 task done
kontrafiktion opened this issue May 29, 2019 · 8 comments
Closed
1 task done

AvroProducer and AvroConsumer without key schema #608

kontrafiktion opened this issue May 29, 2019 · 8 comments
Labels

Comments

@kontrafiktion
Copy link

Description

Our Java producers send messages with a schema for the value, but use a simple string for the key. The Python AvroProducer does not allow a key without a schema. We cannot simply change the existing Java producers. Is there a way to have the exact same string keys used on the Python side. How must the schema be defined? Or can we get rid of the requirement to have a schema for the key?

How to reproduce


Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): ('0.11.6', 722432) ('0.11.6', 722687)
@mhowlett
Copy link
Contributor

mhowlett commented Jun 3, 2019

there's a general serde API in progress that will address this ( #502 ). work will commence on that again before too long ...

@kontrafiktion
Copy link
Author

For now we just created a sub class of the AvroProducer.

If someone else needs such a workaround:

class PatchedAvroProducer(AvroProducer):
    def __init__(self, config, default_key_schema=None, default_value_schema=None, schema_registry=None):
        super(PatchedAvroProducer, self).__init__(config, default_key_schema=default_key_schema,
                                                  default_value_schema=default_value_schema,
                                                  schema_registry=schema_registry)

    def produce(self, **kwargs):
        key_schema = kwargs.pop('key_schema', self._key_schema)
        value_schema = kwargs.pop('value_schema', self._value_schema)
        topic = kwargs.pop('topic', None)
        if not topic:
            raise ClientError("Topic name not specified.")
        value = kwargs.pop('value', None)
        key = kwargs.pop('key', None)

        if value is not None:
            if value_schema:
                value = self._serializer.encode_record_with_schema(topic, value_schema, value)
            else:
                raise ValueSerializerError("Avro schema required for values")

        if key is not None:
            if key_schema:
                key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
            else:
                pass

        super(AvroProducer, self).produce(topic, value, key, **kwargs)

@kontrafiktion
Copy link
Author

Now we have the Problem on the Consumer side. When the Java producer sends a message with a key without a Schema, we get an error:

"message does not start with magic byte"

from confluent_kafka/avro/serializer/message_serializer.py

We do not know whether there is a simple solution like our workaround above.

Would you consider that as a bug?

@kontrafiktion kontrafiktion changed the title AvroProducer without key schema AvroProducer and AvroConsumer without key schema Jun 12, 2019
russau added a commit to russau/kafka-websockets that referenced this issue Dec 22, 2019
@rooom13
Copy link

rooom13 commented Jan 16, 2020

I was obtaining

"message does not start with magic byte as well"

I did my own "AvroCodec" for encoding & decoding messages with an Avro Schema from the schema-registry

`
class AvroCodec():

    def __init__(self, schema_registry_url, subject):
       
        self.schema_registry =CachedSchemaRegistryClient(schema_registry_url)
        _, self.schema, _ = self.schema_registry.get_latest_schema(subject)
        self.reader = DatumReader(self.schema)
        self.writer = DatumWriter(self.schema)

    def decode_message(self, msg_value):
        """Decode message using Avro Schema from schema_registry
    """
        message_bytes = io.BytesIO(msg_value)
        decoder = BinaryDecoder(message_bytes)
        event_dict = self.reader.read(decoder)
        return event_dict

    def encode_message(self, msg_dict):
        """Encode message using Avro Schema from schema_registry
        """
        bytes_writer = io.BytesIO()
        encoder = BinaryEncoder(bytes_writer)
        self.writer.write(msg_dict, encoder)
        return bytes_writer.getvalue()

`

usage:

avro_codec = AvroCodec(SCHEMA_REGISTRY_URL, SUBJECT) msg = avro_codec.decode_message(msg.value())

dependencies:

from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient from avro.io import DatumReader, BinaryDecoder, BinaryEncoder, DatumWriter import io

@SoerenHenning
Copy link

We're currently still using the patch by @kontrafiktion. Can someone say if it is now possible to use the Confluent Python library without a key schema, i.e. only a value schema?

@SoerenHenning
Copy link

@edenhill So the way to go would now be to use a DeserializingConsumer/SerializingProducer with specifying individual key/value.serializer/deserializers? It this right?

@mhowlett
Copy link
Contributor

yes. apologies that we've been slow to stabilize the new API - we don't anticipate significant changes.

@nbonavia
Copy link

I ran into the same issue and for the time being, I am using @kontrafiktion's workaround to mitigate my issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants