Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro consumer return schema #453

Closed

Conversation

saabeilin
Copy link

That's a second attempt to get the schema itself together with the decoded Avro message.
The motivation is described in #449

The previous approach was expecting the decoded message to be a dictionary which is not always true, for instance, it's quite common to have avro-encoded strings as keys. Here we create the wrapping class dynamically, subclassing from the type of the decoded value and adding the schema mixin.

@ghost
Copy link

ghost commented Sep 20, 2018

@confluentinc It looks like @saabeilin just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@rnpridgeon
Copy link
Contributor

Thanks again @saabeilin!

There are two concerns with this current approach:

  1. It can't be turned off
  2. It explodes the application's memory footprint significantly
    • For each Message we potentially need to allocate storage for the Message key and value schema. This schema is already cached by the CachedSchemaRegistryClient and shared across messages.

That said I think a better approach would be to enable the application to query the CachedSchemaRegistryClient's cache via some function. This has three major advantages:

  1. It's optional, user's only have to pay for the look up if they call the function
  2. It reduces schema storage duplication
  3. It brings the CachedSchemaRegistryClient closer to feature parity with the Java client 👍

@saabeilin
Copy link
Author

  1. Yes, and it's not optional in Java, for instance.
  2. Actually not that much. Before the object is modified, only the reference will be kept. It's easy to check by comparing schemas for two message with is. I did a brief check (we use an additional iterator-like wrapper around the consumer)
In [10]: from notifications.consumer import consumer

In [11]: m1 = next(consumer)

In [12]: m2 = next(consumer)

In [13]: m1 == m2, m1 is m2
Out[13]: (False, False)

In [14]: m1.value._schema == m2.value._schema, m1.value._schema is m2.value._schema
Out[14]: (True, True)

So from my point of view it's a rather cheap operation. In many situations the wrappers around will build some "real" Python objects based on schema and the dictionary.

The Registry client, itself, should be out of interest for most of applied developers. We deal with business logic and the registry client is something working under the hood, we just provide a schema so messages are serialized, and get the schema back.

@rnpridgeon
Copy link
Contributor

Ah right python passes immutable objects by reference until they are mutated sorry about that. Right so with that in mind I think your right this should work.

Can you add unit tests which cover wrapping primitive and record types. We should also add documentation for this feature.

@saabeilin
Copy link
Author

@rnpridgeon Ryan, do you think these tests are enough? What/where shall we put these in the docs?

@@ -59,6 +59,30 @@ def __exit__(self, *args):
return False


class HasSchemaMixin:
"""
Wraps a decoded Avro message dictionary to make able to add schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to ".. to add schema attribute"

else:
name = schema.type

klass = type(str(name), (value.__class__, HasSchemaMixin), {})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • isn't name already a string?
  • klass is swedish, use cls or similar.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we still support 2.7, is not name a formally unicode object?



def _wrap(value, schema):
if hasattr(schema, 'namespace'):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc string

@@ -213,4 +237,5 @@ def decode_message(self, message):
if magic != MAGIC_BYTE:
raise SerializerError("message does not start with magic byte")
decoder_func = self._get_decoder_func(schema_id, payload)
return decoder_func(payload)
decoded_message = _wrap(decoder_func(payload), self.registry_client.get_by_id(schema_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the performance implications of this change?
It would be good with comparative numbers between master and this fix.

Also, there is no need for the temporary decoded_message variable, should return _wrap ..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not do any performance testing, but my guess will be avro decoding will be the slowest part. On the other hand, in really many situations the message itself without the schema (or, at least, the full name of the schema which is usually the "message class") becomes close to useless.


def test_schema_mixin_wrapper(self):
schema = avro.loads(data_gen.BASIC_SCHEMA)
for kls in (int, float, dict, list):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

swedish klass

klass = type(str(name), (value.__class__, HasSchemaMixin), {})

wrapped = klass(value)
wrapped._schema = schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the unlikely, but possible, event that existing user's class has a ._schema field, this will silently overwrite it, and is as such a breaking change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but I do not have a better solution. An option (though a breaking change) would be returning a tuple (value, schema) instead, does it make sense?

@saabeilin
Copy link
Author

@rnpridgeon @edenhill Since you've closed #449 - I shall backport these changes to Kafkian. It currently relies on this branch for it to work.

@rnpridgeon
Copy link
Contributor

Let's keep this open until @edenhill chimes in but I think in the end both he and I agreed this should be an extension of the C message type.

I actually thought you had already merged this into Kafkian so closing it was a bit premature. I'll reopen it until we come into a consensus.

saabeilin added a commit to saabeilin/kafkian that referenced this pull request Oct 15, 2018
@@ -213,4 +249,7 @@ def decode_message(self, message):
if magic != MAGIC_BYTE:
raise SerializerError("message does not start with magic byte")
decoder_func = self._get_decoder_func(schema_id, payload)
return decoder_func(payload)
return _wrap(
Copy link
Contributor

@soxofaan soxofaan Oct 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saabeilin Wouldn't it be better that the _wrap call is moved inside the decoder closure of _get_decoder_func?
Then you don't have to do the self.registry_client.get_by_id(schema_id) calls on each record because the schema is already available at construction time of the decoder function inside _get_decoder_func.

@rnpridgeon
Copy link
Contributor

Generic Serde support will provide a means to bind your schema with your data. This is similar to java's 'GenericRecord' implementation. #502

@rnpridgeon rnpridgeon closed this Jan 30, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants