Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified Redpanda and Confluent schema registries #399

Merged
merged 8 commits into from Mar 7, 2024

Conversation

Psykopear
Copy link
Contributor

As noted here, given that redpanda schema registry's compatibility with confluent's library is taken for granted (even if not officially stated), this PR unifies the 2 SchemaRegistry classes into a single one that uses confluent's SchemaRegistryClient under the hood.

The distinction between plain avro and confluent's bytes+avro format is still useful, but it's not a choice that you make when instantiating the (de)serializers rather than when picking the right "schema registry", which makes a lot more sense.

Notes:
The format of a schema could be retrieved from the response given to the client. But confluent doesn't make a distinction between plain-avro and their own thing, so we can't really know which format is being used beforehand. So the specific serialization format has to be passed to .serializer and .deserializer methods.


# Deserialize both key and value
key_de = registry.deserializer(SchemaRef("sensor-key"))
val_de = registry.deserializer(SchemaRef("sensor-value"))
key_de = registry.deserializer(SchemaRef("sensor-key"), serde_format="plain-avro")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering where the user should parameterize the format of the deserializer, given that at some point, someone will probably want to deserialize protobuf messages. In this proposed structure, would serde_format also take protobuf as an argument, or is there a different structure we should consider?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this same arg can be used to specify other formats too, as we are only initializing the (de)serializers here, so we can pick any other one

pysrc/bytewax/connectors/kafka/operators.py Outdated Show resolved Hide resolved
pysrc/bytewax/connectors/kafka/operators.py Outdated Show resolved Hide resolved
pysrc/bytewax/connectors/kafka/serde.py Outdated Show resolved Hide resolved
Comment on lines 110 to 122
if serde_format == "confluent-avro":
if schema_ref is not None:
_logger.warning("schema_ref supplied to deserialized will be ignored")
return ConfluentAvroDeserializer(self.client)
elif serde_format == "plain-avro":
assert (
schema_ref is not None
), "schema_ref required for 'plain-avro' deserialization format"
schema = self._get_schema(schema_ref)
return PlainAvroDeserializer(schema)
else:
msg = f"Unsupported serde_format: {serde_format}"
raise ValueError(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mix of asserts, exceptions, and warnings here lead me to believe this design is not resulting in the correct layers of abstraction. I agree with Dan's comment above, but I think the mismatch is more than just the serialization format. This design feels like it'll require our intervention to extend it to new formats and

I think there are three independent questions that need to be answered for a given deployment:

  1. Registry Client - How do I talk to the schema registry? URL, certs, auth, API (although the original claim here is that Confluent and Redpanda have identical APIs, so this is not an issue here today).

  2. Schema Logic - Where do I get the schema I want to use for a given message? Is it magic bytes in the Kafka message? Does the registry know it because of the topic name? Is it a manual convention and it needs to be manually specified? I'm sure you could come up with other conventions too.

  3. Format - What is the format we want to use to serialize an item with a given schema? Avro, JSON, Protobuf, etc.

  4. Serde - How do I actually do the bytes from/to dict conversion. This requires both knowing the format and the schema.

There's also some dependencies here, although this isn't super formal.

graph TD

R[Registry]
M[Message]
T[Topic]
S[Schema]
L[Schema Logic]
F[Format]
SD[Serde]

M -. or .-> L
T -. or .-> L
R --> L
L --> S
S --> SD
F --> SD

But it looks like confluent_kafka kind of already provides all these primitives? Is there a way we could re-use them and not need to re-build this hierarchy?

Point 1 is already fully encapsulated by the confluent_kafka.schema_registry.SchemaRegistryClient.

confluent_kafka.schema_registry.avro.AvroSerializer and confluent_kafka.schema_registry.protobuf.ProtobufSerializer already has the subject.name.strategy option to handle Point 2 and instantiating each of those also solves point 3 and point 4.

This seems to reduce our work to the question of how to call the final closure in the producer https://github.com/confluentinc/confluent-kafka-python/blob/5a87879681d28375a55203c4839338b13b668046/examples/protobuf_producer.py#L86-L89 and the consumer https://github.com/confluentinc/confluent-kafka-python/blob/5a87879681d28375a55203c4839338b13b668046/examples/protobuf_consumer.py#L60


The above objects obviously work for the Confluent schema registry because they were designed for that. But the observation is that they should also work for the Redpanda schema registry because the API is compatible?

Is there some "serde strategy" that using Redpanda requires us to support, but confluent_kafka.schema_registry.protobuf.ProtobufSerializer and friends do not?

Copy link
Contributor Author

@Psykopear Psykopear Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema Logic - Where do I get the schema I want to use for a given message? Is it magic bytes in the Kafka message? Does the registry know it because of the topic name? Is it a manual convention and it needs to be manually specified? I'm sure you could come up with other conventions too.

I think that's the reason why confluent's adds the magic bytes. You usually need to know the schema_id beforehand, or through other, unspecified means. The schema object returned from the registry then tells you its format. So we could instantiate the correct deserializer looking at the schema. The problem is that confluent's schema registry calls their own special format AVRO, so we have no way of knowing if a message is plain avro or not from the response of the registry, hence why I added the option at the deserializer instantiation.

I'll look more into docs and api to see if I missed something here

edit: To be clear, the main source of the problem is that their deserializer will fail with a plain avro message, otherwise we'd be good with that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some more context, here:

Problem: Reading non-Avro data with AvroConverter
This may be the most common error that I see reported again and again on places like the Confluent Community mailing list and Slack group. It happens when you try to use the Avro converter to read data from a topic that is not Avro. This would include data written by an Avro serializer other than the Confluent Schema Registry’s Avro serializer, which has its own wire format.

I like how they define "data written by any avro serializer that's not ours" as "not avro". I understand the need for the wire format, but complaining that people want to deserialize correct avro messages with something called AvroConverter is funny.

This is the description of the wire format in their docs:
https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format

The same problem will apply to protobuf if we add support for it.

So, to sum up, the schema is effectively Avro, that's why the call it that in the registry, but the messages you serialize with their library are instead in the wire format.

My initial idea was to add the format in SchemaRef, so that when you instantiate a (de)serializer you already have all the info you need. The problem with doing that using confluent's library, is that the deserializer only accepts the wire format, and when it sees a different schema_id it will just download the new schema with no way of customizing the behavior. If we instead use only an avro deserializer, it won't work with messages serialized by confluent's library.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. The serialization format and "where do you get the schema from" are not totally separate: the magic byte format is actually combining those two concerns.

I have some suggestions in the following review.

@@ -46,14 +61,34 @@ def __init__(self, client, schema_str):
conf={"subject.name.strategy": record_subject_name_strategy},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this strategy being fixed? Is it possible to interop with other systems this producer might want to chose a different strategy?

Where does the "record name" come from? Is it derived from the schema str?

Update: Actually see the next comment before answering.

"""

def __init__(self, client: SchemaRegistryClient, schema_str: str):
"""Instantiate a ConfluentAvroSerializer."""
# Use a different "subject.name.strategy" than the default. See:
# https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#subject-name-strategy
self.serializer = AvroSerializer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, this feels like the same problem replicated one level downward: Confluent has a ton of config knobs on their SchemaRegistryClient so we shouldn't try to duplicate its API; instead the user configures it and then injects it for use. Then we don't have to duplicate config arguments, etc.

Perhaps this class should be a ConfluentSerializer and it takes a fully configured {Avro,Protobuf,Json}Serializer and just includes the functionality of "when to call this in the dataflow". Then we don't have to duplicate the concept hierarchy.

Also a parallel ConfluentDeserializer.

Update: Ok see the next comment again before implementing.



class ConfluentSchemaRegistry:
"""Confluent's schema registry for Kafka's input and output connectors."""
class SchemaRegistry:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the above idea, it feels like this class should only be used where we want to "hand connect" schemas and {de,}serializers in the Redpanda case. Confluent already gives us all the tools to use / write magic bytes to select schemas.

Concretely, what if we had two modules:

  1. bytewax.connectors.kafka.confluent_serde if you want to use the magic bytes version. It has Confluent{Serializer,Deserializer} to which you inject fully configured confluent_kafka.schema_registry.avro.{AvroDeserializer,AvroSerializer} or the protobuf or JSON ones. You'll need to instantiate a confluent_kafka.schema_registry.SchemaRegistryClient to make those, but it's not used directly by any of the Bytewax code.

  2. bytewax.connectors.kafka.fixed_serde or something. It contains RawAvro{Serializer,Deserializer} which you instantiate with a confluent_kafka.schema_registry.Schema which you get out of a confluent_kafka.schema_registry.SchemaRegistryClient.get_schema you've configured separately. We can then add Raw{Protobuf,JSON,Etc}{Serializer,Deserializer} later with each of those deps.

This assumes that delegating the work of talking to schema registries to confluent_kafka.schema_registry.SchemaRegistryClient works with both Redpanda and Confluent registries, but the whole point of making this change was that it does. It also assumes that if you want a custom schema detection that is not what the Confluent client library provides or a static mapping, you're on your own.

Update: Ignore literally implementing this and see the suggestion below.

# sure to pass a dict to the serializer.
# Also, we can pass `None` as the ctx since our "subject.name.strategy"
# does not use it, the types are not properly specified in confluent_kafka.
return cast(bytes, self.serializer(obj, ctx=None))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uff. Ok another thought looking through the Confluent examples. https://github.com/confluentinc/confluent-kafka-python/blob/5a87879681d28375a55203c4839338b13b668046/examples/avro_consumer.py#L101 We don't have a principled way to fill in the serde context object with this API design. That probably means that setting up "use the topic to determine the schema" and other things that depend on the context data won't work correctly.

This leads me to think we should eventually go for a larger API change where the serialization operator requires upstream to be shaped like def serialize(up: Stream[KafkaSinkMessage[dict, dict]], key_ser: confluent_kafka.serialization.Serializer, value_ser: confluent_kafka.serialization.Serializer) -> Stream[KafkaSinkMessage[bytes, bytes]] so that we can look at the destination topic, headers, etc. on a pre-prepared message and fill them in the context object and pass it in. A parallel version for the deserialization operator.


This then leads me to a final API simplification: we can standardize around the ABCs confluent_kafka.serialization.{Serializer,Deserializer}. Our serde operators could take those, and if you are using the Confluent magic bytes pattern, great, you build the confluent_kafka.schema_registry.avro.{AvroDeserializer,AvroSerializer} or the protobuf or JSON ones. Or, we provide some PlainAvroDeserializer which conforms to confluent_kafka.serialization.Serializer (instead of any custom Bytewax ABC), and all the other permutations of Plain{Avro,Protobuf,JSON}{Serializer,Deserializer}, since those things don't come built in confluent_kafka.


Ok, so what does this look like in total?

Our serde operators look like:

def serialize(up: Stream[KafkaSinkMessage[dict, dict]], key_ser: confluent_kafka.serialization.Serializer, value_ser: confluent_kafka.serialization.Serializer) -> Stream[KafkaSinkMessage[bytes, bytes]]:
def deserialize(up: Stream[KafkaSourceMessage[bytes, bytes]], key_ser: confluent_kafka.serialization.Deserializer, value_ser: confluent_kafka.serialization.Deserializer) -> Stream[KafkaSourceMessage[dict, dict]]:

If you want to use Confluent and it's magic bytes format, great. You re-use all the stuff (schema registry client, then avro serializer, e.g.) in the confluent_kafka lib to instantiate the relevant confluent_kafka.serialization.{Serializer,Deserializer} impl. Then you use the above operators.

If you don't want to use the Confluent magic bytes thing, and instead want to statically pick a schema from a registry and write out raw Avro, also fine. You re-use the schema registry client from confluent_kafka to make a connection and pluck out a schema, then you pass that schema to one of our new class PlainAvroSerializer(confluent_kafka.serialization.Serializer) impls (also for all the other permutations). You then still use the above operators.

No need for our own ser/de ABC interfaces, no need for our own wrapper schema registry client. If someone has another need for "how to decide what schema to use" beyond the Confluent options or a static schema+format, they can write their own extension, or eventually we could pre-package it if there's a common enough use case. Or perhaps Redpanda will come up with a competing convention or something.


Other minor details.

  • Types It's still the user's job to figure out how to turn their model classes into dicts for serialization. Also technically confluent_kafka.schema_registry.avro.AvroDeserializer includes a from_dict argument which could do the transformation for you, but I think because that lib isn't typed it's more confusing to attempt to use that.

  • Optional Keys I think you could use @overload to have a version where key_ser is optional and that changes the return type to be Stream[KafkaSinkMessage[None, bytes]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think this makes sense. I initially didn't want to straight up use confluent's interfaces, but my initial hope was to keep everything much more decoupled from the library than it currently is, as I also assumed redpanda's integration would require a different approach. At this point it makes sense to embrace that though. I'm working on it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I didn't either, but I didn't understand at first how tightly coupled the "Kafka context" would be to the serialization options per-message. In that case it makes sense to generalize a bit more to possibly use this logic outside of Kafka, but I now think that is not the right move.

Comment on lines 110 to 122
if serde_format == "confluent-avro":
if schema_ref is not None:
_logger.warning("schema_ref supplied to deserialized will be ignored")
return ConfluentAvroDeserializer(self.client)
elif serde_format == "plain-avro":
assert (
schema_ref is not None
), "schema_ref required for 'plain-avro' deserialization format"
schema = self._get_schema(schema_ref)
return PlainAvroDeserializer(schema)
else:
msg = f"Unsupported serde_format: {serde_format}"
raise ValueError(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. The serialization format and "where do you get the schema from" are not totally separate: the magic byte format is actually combining those two concerns.

I have some suggestions in the following review.

Copy link
Contributor

@davidselassie davidselassie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some more ideas for how to update this API to hopefully work with what Confluent gives us, but provide the right kind of hooks and conveniences. I went through a rabbit hole to come to the ideas at the end, but I left my intermediate comments in order so you could see what I was thinking to get to that last set of ideas. I think the PR keeps the comments in the order I created them in the "conversation" view.

tl;dr: Try to re-use even more of confluent_kafka so we don't have to bridge config. Re-use their serde interface. To re-use it correctly, we have to limit the location of serde to be on whole Kafka messages.

@Psykopear
Copy link
Contributor Author

Update: Actually see the next comment before answering.

Update: Ok see the next comment again before implementing.

Update: Ignore literally implementing this and see the suggestion below.

Eheh, I know the feeling, while writing this I had to revisit my assumptions more than once 😄 I added a comment to your last revision though, at this point I agree that it makes sense to directly shape the integration on confluent's library, and just have an escape hatch for the few things we want to allow users to do differently

@Psykopear Psykopear force-pushed the schema-registries-fixes branch 3 times, most recently from 24ba8f5 to 5892001 Compare February 29, 2024 14:44
Copy link
Contributor

@davidselassie davidselassie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Minor comments on documentation.

docs/articles/concepts/kafka.md Outdated Show resolved Hide resolved
docs/articles/concepts/kafka.md Outdated Show resolved Hide resolved
pysrc/bytewax/connectors/kafka/serde.py Outdated Show resolved Hide resolved
pysrc/bytewax/connectors/kafka/serde.py Outdated Show resolved Hide resolved
def ser(self, obj: SerdeIn) -> SerdeOut:
"""Serialize an object."""
...
def __init__(self, schema_str: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice to have this match the constructor of the Confluent AvroSerializer and handle not unpacking the Schema.

Suggested change
def __init__(self, schema_str: str):
def __init__(self, schema: Union[str, Schema]):

https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#avroserializer also mentions you need to pass a Schema if the schema references other schemas within? Does fastavro.parse_schema handle this? Or does that mean that there are some schemas that will only work with the Confluent serde classes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I missed that, but fastavro does allow referencing schemas by name.
If you are reading schemas from a directory, load_schema automatically handles reading those.
You can also implement a light interface offered by fastavro to let it download schemas from a registry rather than looking at the directory (but you need to implement everything there).
Once you have all your referenced schemas parsed, you can pass them in a dictionary named_schemas to parse_schema.
We don't want to deal with retrieving the other schemas here, but it should be enough to proxy a named_schemas parameter to parse_schema for that to work. Fixing it

pysrc/bytewax/connectors/kafka/serde.py Outdated Show resolved Hide resolved
pysrc/bytewax/connectors/kafka/serde.py Outdated Show resolved Hide resolved
Comment on lines 17 to 24
"""Bytewax serializer that uses fastavro's serializer.

class SchemaSerializer(ABC, Generic[SerdeIn, SerdeOut]):
"""A serializer for a specific schema."""
Beware that using plain avro serializers means you can't
deserialize with `confluent_kafka` python library, as that
requires some metadata prepended to each message, and plain
avro doesn't do that.
If you plan to deserialize messages with `confluent_kafka` python
library, please use the `ConfluentAvroSerializer` instead.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the wording in this section needs to change a little bit with this current setup given that we are using the Confluent serde interface. Perhaps something like:

Unframed Avro serializer. Encodes into raw Avro.

This is in comparison to {py:obj}`confluent_kafka.serialization.AvroSerializer` which prepends magic bytes to the Avro payload which specify the schema ID. This serializer will not prepend those magic bytes. If downstream deserializers expect those  magic bytes, use {py:obj}`~confluent_kafka.serialization.AvroSerializer` instead.

Comment on lines 39 to 45
"""Bytewax deserializer that uses fastavro's deserializer.

Beware that this can't deserialize messages serialized with
`confluent_kafka` python library, as that prepends some magic bytes
and the schema_id to each message, and plain avro doesn't do that.
If you want to deserialize messages serialized with `confluent_kafka`
python library, please use the `ConfluentAvroDeserializer` instead.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unframed Avro deserializer. Decodes from raw Avro.

Requires you to manually specify the schema to use.

This is in comparison to {py:obj}`confluent_kafka.serialization.AvroDeserializer` which expects magic bytes in the output proceeding the actual Avro payload. This deserializer _can not_ handle those bytes and will throw an exception. If upstream serializers are including magic bytes, use {py:obj}`~confluent_kafka.serialization.AvroDeserializer` instead.

@davidselassie
Copy link
Contributor

davidselassie commented Mar 6, 2024

Sorry I used the wrong import path in those xrefs. Running cd docs; ./autobuild.sh showed the same errors. In the future if you want to debug Sphinx xref issues, you can use the incantations https://docs.bytewax.io/latest/articles/contributing/writing-docs.html#finding-reference-names to search through all the available options.

I pushed fixes, but I'll let you merge if you're done otherwise.

Psykopear and others added 8 commits March 7, 2024 10:12
Fixed docstrings
Made (de)serializer classes public too.
Co-authored-by: David Selassie <david@bytewax.io>
Signed-off-by: Federico Dolce <psykopear@gmail.com>
Co-authored-by: David Selassie <david@bytewax.io>
Signed-off-by: Federico Dolce <psykopear@gmail.com>
@Psykopear
Copy link
Contributor Author

Thanks @davidselassie . I added a line in the changelog and updated the examples, merging this.

@Psykopear Psykopear merged commit 8d50cb5 into main Mar 7, 2024
27 checks passed
@Psykopear Psykopear deleted the schema-registries-fixes branch March 7, 2024 10:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants