Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic SerDe support #502

Closed
wants to merge 14 commits into from
Closed

Add generic SerDe support #502

wants to merge 14 commits into from

Conversation

rnpridgeon
Copy link
Contributor

I'm bad at naming things, open to discussion on PARTITION_UA and anything else you might find awkward. Unless it breaks compatibility or requires a lot of work to make compatible <3.

@rnpridgeon rnpridgeon changed the title Serde Add generic SerDe support Dec 5, 2018
"""
Create a new Kafka Consumer instance.

To avoid spontaneous calls from non-Python threads all callbacks will only be served upon
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a remark/notice.

confluent_kafka/consumer.py Show resolved Hide resolved
confluent_kafka/consumer.py Show resolved Hide resolved
**note** deserializers are responsible for handling NULL keys
:param func value_deserializer(topic, value): Converts message value bytes to object.
**note** deserializers are responsible for handling NULL values
:param func on_commit(err, [partitions]): Callback used to indicate success or failure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes it look like partitions is optional, which it is not.

Copy link
Contributor Author

@rnpridgeon rnpridgeon Dec 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I thought [stuff] implied list but I could see how this is a problem.

Copy link
Contributor

@mhowlett mhowlett Dec 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add PEP 484 compatible type annotations across the library like so: https://mypy.readthedocs.io/en/latest/python2.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as it handles the str|byte compatibility issues between py2 and py3 I'm game

confluent_kafka/consumer.py Show resolved Hide resolved
confluent_kafka/src/confluent_kafka.c Show resolved Hide resolved
tests/integration/integration_test.py Show resolved Hide resolved
tests/integration/integration_test.py Outdated Show resolved Hide resolved
tests/integration/integration_test.py Outdated Show resolved Hide resolved
tests/integration/integration_test.py Outdated Show resolved Hide resolved
super(Consumer, self).__init__(conf, on_commit=on_commit, stats_cb=stats_cb,
throttle_cb=throttle_cb, logger=logger)

def poll(self, timeout=-1.0, key_deserializer=None, value_deserializer=None):
Copy link
Contributor

@mhowlett mhowlett Dec 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, better to constrain how the library can be used here I think and not allow serializers to be specified both in the poll method and the constructor - less options is better unless the value is clear. I'd remove them from here, since in practice this option will never be used I don't think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't maintain compatibility with the existing AvroProducer/Consumer without it. Since we have already provided this in the API we can't just rip it out without notice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a new class - i don't think ensuring users can swap out the old avro producer and consumer for this one has any priority - you're already breaking compatibility by requiring they change classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea actually is to get away from having a specific class for Avro. Instead this should be configurable using the serializer interface proposed in this PR. You are right though per request overrides don't make sense in the Consumer. You can't actually pass your own schema per poll() in the AvroConsumer either. I just got ahead of myself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are AvroProducer/AvroConsumer a bad idea?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What makes it it a good idea? I don't think it does anyone a great service to have serialization type specific consumers/producers as part of the library. As a specialized wrapper sure but not part of the core library. When/If the SR starts to support JSON and Protobuf should we add a Protobuf/JSON Consumer/Producer or provide examples of how to use the generic SerDe interface to achieve these goals. I favor option 2 obviously but I'm of course open to suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a new class

Thank you for this @mhowlett , you are right and this got me thinking. The avro producer isn't actually allowing for serializer overrides so much as it's allowing you to optionally introduce a new datum writer by providing a new schema. Either way this is an example of the serializer's responsibility leaking into the producer itself which is not what we want. Instead the serializer should handle this. What makes this weird for Avro is that the python implementation does not actually have this notion of Specific or even Generic record types. If you want a record you pass a dict with no schema tied to it. That said this is a new class, and a new GenericRecord serializer(and supporting GenericRecord type (binding schema to dict) can also be introduced
to handle this. In the end this will make for a much better Avro experience for everyone while also removing all these checks from the produce path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: @edenhill's question. At one point with the .NET client, I was seriously considering separate producer/consumer and avro producer/consumer classes. The motivation for this was the observation that the serde interfaces are very simple, except for the requirements of the SR integration. It seemed reasonable to have P/Cs for simple serdes (which furthermore allowed for beautiful terse inline specification of them), which was applicable to most scenarios and defer avro functionality to elsewhere. Initially, i was thinking this could even be done with C# extension methods, not requiring a the classes for avro (but that was not the case). But it turned out to work much better to have completely general serde delegate types (two of them, one async, one not) and have the structure @rnpridgeon is suggesting above. @rnpridgeon's comment about avro being too specific also re-enforces this choice.

@rnpridgeon - don't quite follow what you're saying, but broadly sounds good and look forward to reviewing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per poll serializer overrides have been removed

on_delivery = callback

# parameter overrides take precedence over instance functions
if not key_serializer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as with the consumer, we want something else to happen if a serializer is not specified - exception, or value passthrough (probably the latter).

unlike the consumer, there is value in allowing serializers to be different on each produce. in python, as you are doing seems ok to me, though I reserve the right to change my mind :-). however, the nomenclature on serializers passed into constructor should be changed to imply they are defaults.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose on erroring out when serializers are not configured? The producers and consumers currently produce without serializer without issue. I do not see a reason to break this.

if value_serializer:
value = value_serializer(topic, value)

super(Producer, self).produce(topic, value, key, partition, on_delivery=on_delivery,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is value in having the un-serialized values in the delivery reports, so i think this should be a feature of the library (though it has a cost so it should be togglable). however you're throwing this information away here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have access to the message state from python which is one of the shortcomings of pulling this into python. What we can do however is provide several examples of delivery report callbacks which preserve the message contents pre-serialization which was the plan. Alternatively we can push this down into the extension, set the message contents with the msg key/value an avoid the extra work. Either one is doable although I think starting to pull things int python is nice. We could also wrap user provided callbacks in a lambda and change the docstring.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a required feature to me. i didn't understand your plan with the extension - would you get a pointer to the original python value, pass that via the opaque parameter to the produce method, and get the value again using this pointer in the delivery report?

Copy link
Contributor Author

@rnpridgeon rnpridgeon Dec 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can call the serializer and hand off the message data in two places.

  1. In the extension, not the approach we took
  2. In python as we have done here. This means we do not have access to the message opaque pointer which is passed to produce however.

If we went with option 1 we could simply provide the message state object with the data before calling serializer. Alternatively we could make a copy of the data and pass that to the message state object(which would be our opaque pointer). Either way we have access to the existing message state and the opaque pointer in the extension but not from python.

For option 2 we can use a closure to bring the original message contents withit as we do in the avro-cli example.

https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro-cli.py#L103

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mhowlett I've thought about this more and I'm not sure I agree that providing the unserialized payload is the correct move here. If we do this then the delivery report is reporting what we wanted to produce as opposed to what we actually did produce. That to me breaks the contract that the user has with the delivery report.

Using a lambda to carry the payload with your callback seems like the right move here. This ensures that the delivery report upholds its intended purpose while giving users the flexibility to cater to more specific use cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also the possibly undesired or unexpected reference to the original user object that would be kept alive and out of gc until the delivery report was triggered, which may have unwanted bad consequences (resource usage, etc)

"""

# on_delivery is an alias for callback and take precedence
if callback and not on_delivery:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a timeline for depreciating this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of proper feature deprecation is one major release. i.e. Deprecation warning in 1.0 removal in 2.0.


# Callbacks can be set in the conf dict or *ideally* as parameters.
# Handle both cases prior to passing along to _impl
# If callbacks are configured in both places parameter values take precedence.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine to enforce the callback being set in only one place, any other use is ambigious.
Let's follow Matt's rule of least surprise: enforce the callback is only specified in one place.
I don't consider that a breaking API change, it will only break bad apps.
It is literally just a couple of lines of code, something like:

  for var, name in [(logger, 'logger', on_commit, 'on_commit', ...)]:
     if var is not None and conf.get(name, None) is not None:
       raise ...

super(Consumer, self).__init__(conf, on_commit=on_commit, stats_cb=stats_cb,
throttle_cb=throttle_cb, logger=logger)

def poll(self, timeout=-1.0, key_deserializer=None, value_deserializer=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are AvroProducer/AvroConsumer a bad idea?

Consumes a message, triggers callbacks, returns an event.

The application must check the returned Message object’s Message.error() method to distinguish
between proper messages, an error(see error().code() for specifics), or an event.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"an error (see msg.error().code() for specifics)"

confluent_kafka/consumer.py Outdated Show resolved Hide resolved
confluent_kafka/consumer.py Show resolved Hide resolved

if value_deserializer:
msg.set_value(value_deserializer(topic, msg.value()))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the performance implications of this wrapping and extra checks? (for both consumer and producer)
I want numbers!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the details, but it seems like we might ideally want a super fast path for (at least) UTF8, int and long where the deserialization happens in C. Have you investigated a three-pronged approach (no/fast/custom deserialization) along these lines? I think we might get string for free with python 2, but not python 3... not an expert though and haven't investigated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All Python objects are of type PyObject and will need to be converted to their c counterparts so I'm not sure I understand what you mean here. Even bytes are of type PyBytes in Py3 so I'm not sure you get anything free in either version.

https://docs.python.org/2/c-api/string.html

:param func callback(confluent_kafka.KafkaError, confluent_kafka.Message):
See on_delivery.
:param int timestamp: Message timestamp (CreateTime) in milliseconds since epoch UTC
(requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably bump MIN_RD_KAFKA_VERSION to v1.0.0, can you do that? And remove doc references to older versions.

The smaller support matrix the better



class GenericAvroRecord(dict):
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider composition instead of inheritance here because 1. it's a looser form of coupling (allows for more future flexibility), so is generally a better default 2. composition is used in other languages, e.g. https://github.com/confluentinc/avro/blob/master/lang/csharp/src/apache/main/Generic/GenericRecord.cs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually quite intentional as it allows me bind the schema with the record without needing to dereference the record contents in the serializer. Since all records are represented as dicts and the datum writer will only accept a dict I think this is the right move for python.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't quite understand what you're trying to say, but there would be only minor difference in code between the two approaches (the composition approach would be a bit more but not much). Anyway, I'm ok with the inheritance relationship here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'm saying why call serialize(record.data) vs serialize(data). I'm not convinced I gain anything from composition here. On the contrary however Users can't treat this GenericAvroRecord exactly as if it were a dictionary. Just like they could before I introduced a new type. i.e. record['stuff']='a' or record.get('stuff',None).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i think it's fine. you could of course provide pass through methods (which other implementations do), but there may also be some python syntax that doesn't work (haven't looked into it). serialize(record.data) vs serialize(data) is a non-issue since it's internal. anyway, i'm happy enough with this, just pointing out hte difference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing the other thing that is nice is your record types work exactly as they did in the past with no material changes. You can still use record.put and record[key] as you did previously since all records are dicts

Log messages will only be forwarded when ``client.poll()`` or ``producer.flush()`` are called.
:raises TypeError: If conf is not a dict.
"""
def __new__(cls, *args, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fancy. this seems like a good solution to the requirements to me, though the scenario is unique to python and i'm not 100% confident i'm not missing any downsides.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not terribly easy to read but it does allow me to offer both APIs at almost no cost to those who wish to forego using the serializing producer/consumer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what the manual says on __new__:

new() is intended mainly to allow subclasses of immutable types (like int, str, or tuple) to customize instance creation. It is also commonly overridden in custom metaclasses in order to customize class creation.

this is neither of those use cases, which makes me wary. I still think it's justified as a performance optimization (but again, wary that there might be something important i don't know about). i'd be interested in the perf difference between using a byte deserializer and bypassing the DeserializingConsumer as you're doing now as the former is more conservative.

Copy link
Contributor Author

@rnpridgeon rnpridgeon Feb 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also commonly overridden in custom metaclasses in order to customize class creation.

Technically the c implementation is not a metaclass; but is this not an example of customizing class creation? Note we still return a producer class, it has been 'enriched' if you will but not extended so there really are no surprises. All of the changes are internal including the serialize API.



class TestAvroSerializer(unittest.TestCase):
def setUp(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good also to supply off-the-shelf implementations for other common data types in this PR. amongst other things, it will validate the generality of the approach taken.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: setUp casing seems atypical to me, setup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack
ack

Copy link
Contributor

@mhowlett mhowlett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned we're locking in a sub-optimal API by not providing unserialized key/values in the delivery report in this PR.



class ValueSerializerError(SerializerError):
pass


class ContextStringIO(io.BytesIO):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always found this thing funny... you do know that there is contextlib.closing, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, although to be honest I just reused the existing code. Patches welcome though :)

elif isinstance(datum, int):
return PrimitiveSchema('int')
elif isinstance(datum, float):
return PrimitiveSchema('float')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python floats are double precision and if you don't use a 'double' primitive schema here, serialization will be lossy in most cases which seems bad.

same with int - python seems to have unified int's to be mean long in more recent versions of the language, and I think just using a 'long' primitive schema type for all int values is best.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did forget to add the double, I'll add that in. I do not think we should be silently promoting floats. The avro implementation actually ends up calling write_long within write_int anyway so there is nothing to be gained there. It's much more straightforward to say an int is an int.

https://github.com/apache/avro/blob/master/lang/py/src/avro/io.py#L300-L304

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python doesn't have any data type that corresponds to avro's float or avro's int.

python float is equivalent to avro double.

pre-python3, I think python int's are platform dependent. On my machine they'e 64 bit (i.e. equivalent to avro long).

in python3, all integer values are arbitrary precision (i.e. there is no corresponding avro type, but the best choice is long, presumably the avro library has some error checking).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true for floats on read but it absolutely differentiates between the float and double on write:

https://github.com/apache/avro/blob/master/lang/py/src/avro/io.py#L316-L326

Since the Avro implementation differentiates in its treatment of floats and doubles its not really our place to say you really meant double. We have to work under the assumption that the developer using the library was aware that an avro float is 32 bits and that is why they made the conscious decision to declare a float.

I'm not super keen on the idea of upgrading generic ints either. This would be inconsistent with the handling of Avro Records with int fields or even primitives that were specific primitives(provide the primitive schema before hand). If you can accidentally write a 64-bit integer to an avro record field you can accidentally write a 64-bit int to a generic int.

I agree this is unfortunate and perhaps an improvement that can be made upstream. But until then we should continue to mark ints as ints I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh whoops, would you believe I forgot the schema is specified. very good, stupid me, carry on.


class SerializerError(Exception):
"""Generic error from serializer package"""

def __init__(self, message):
def __new__(cls, message, is_key=False):
Copy link
Contributor

@mhowlett mhowlett Feb 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you'd ever want to have different handlers for key vs value serialization errors, so I don't think you need two types - just SerializerError would be better I think, with a property to indicate whether it's a key or not.

Also, you've used __new__ again here. This seems like a bad pattern to me - the benefit of doing it your way is it makes usage terser, but in general, this is at the expense of giving up on the idea of knowing what you're going to get on instantiation without looking at the implementation. I'm not familiar with working in languages that have this capability, but can't see how this tradeoff is a good one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bear with me here as I know this goes against all my other arguments where I say I don't want to let Avro dictate python's serde API... Where it makes sense I'm trying to make the new serde api match the behavior of the existing Avro Producer and Consumer. For this reason we differentiate between a key and value serializer. I personally do not have strong feelings for either approach. If you on the other hand feel strongly that this should just be one exception type we can back it out.

As for __new__ I honestly don't feel this is too terrible when the resulting object is still the same class. There are no real surprises here, I wanted a Serialization exception I got one. This one is just a tad more specific than the other but they will both be caught by except SerializerError. If we decide to just return one SerializationError type however there is no need for this at all. Anyway let me know what you think, as I said I don't have particularly strong feelings either way. Now that you know my intent I'll defer to your better judgement here.

self.is_key = is_key
self.reader_schema = reader_schema

def __call__(self, topic, datum):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datum is not a good name for the serialized data, as it suggests the actual object itself (especially in avro terminology). maybe payload?

from confluent_kafka.avro.schema import GenericAvroRecord
from confluent_kafka import avro

def b64_decoder(topic, data):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you plan to add a standard set of serializers (for int, long, utf8, base64)? Even just the need for the topic parameter in the serde methods screws up any hope of being able to beautifully specify these inline, increasing the benefit of supplying this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't plan to originally no as its fairly trivial to derive your own serde implementations. Presumably these are serdes that ship with .NET now?

@ferozed
Copy link
Contributor

ferozed commented Sep 30, 2019

hi! When will this PR be merged ?

@rnpridgeon
Copy link
Contributor Author

duplicate of #787

@rnpridgeon rnpridgeon closed this Apr 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants