Skip to content

Commit

Permalink
Add Avro serializer tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rnpridgeon committed Jan 5, 2019
1 parent 829277d commit f731b24
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 41 deletions.
34 changes: 31 additions & 3 deletions confluent_kafka/avro/schema.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
#!/usr/bin/env python
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from avro.schema import PrimitiveSchema

# Python 2 considers int an instance of str
Expand All @@ -8,12 +25,23 @@


class GenericAvroRecord(dict):
"""
Pairs a AvroRecord with it's schema
:param schema schema: A parsed Avro schema.
:param dict record: Wraps existing dict in GenericAvroRecord
:raises ValueError: If schema is None
:returns: Avro record with its schema
:rtype: GenericAvroRecord
"""
__slots__ = ['schema']

def __init__(self, schema, datum=None):
def __init__(self, schema, record=None):
if schema is None:
raise ValueError("schema must not be None")
self.schema = schema
if datum is not None:
self.update(datum)
if record is not None:
self.update(record)

def put(self, key, value):
self[key] = value
Expand Down
124 changes: 99 additions & 25 deletions confluent_kafka/avro/serializer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import traceback

from confluent_kafka.avro.schema import GenericAvroRecord, get_schema
from confluent_kafka.avro import ClientError
from confluent_kafka.avro.error import ClientError

log = logging.getLogger(__name__)

Expand All @@ -47,7 +47,7 @@ def __new__(cls, message, is_key=False):
return super(SerializerError, cls).__new__(KeySerializerError, message)
return super(SerializerError, cls).__new__(ValueSerializerError, message)

def __init__(self, message):
def __init__(self, message, is_key=False):
self.message = message

def __repr__(self):
Expand Down Expand Up @@ -81,31 +81,101 @@ def __exit__(self, *args):
return False


def TopicNameStrategy(topic=None, is_key=False, schema=None):
"""
Constructs the subject name under which a schema is registered with the Confluent Schema Registry.
TopicNameStrategy returns the schema's subject in the form of <topic>-key or <topic>-value.
:param str topic: Topic name.
:param is_key: True if subject is being registered for a message key.
:param schema schema: Parsed Avro schema. *Note* Not used by TopicNameStrategy
:raises ValueError: If topic is unset.
:returns: The subject name with which to register the schema.
:rtype: str
"""
if topic is None:
raise ValueError("Topic must be set when using TopicNameStrategy")

return "-".join([topic, '-key' if is_key else '-value'])


def RecordNameStrategy(topic=None, is_key=False, schema=None):
"""
Constructs the subject name under which a schema is registered with the Confluent Schema Registry.
RecordNameStrategy returns the fully-qualified record name regardless of the topic.
Compatibility checks of the same record name across all topics.
This strategy allows a topic to contain a mixture of different record types.
:param str topic: Topic name. *Note* Not used by RecordNameStrategy
:param is_key: True if subject is being registered for a message key. *Note* Not used by RecordNameStrategy.
:param schema schema: Parsed Avro schema.
:raises ValueError: If schema is not set.
:returns: The subject name with which to register the schema.
:rtype: str
"""
if schema is None:
raise ValueError("Schema must be set when using RecordNameStategy")

return schema.fullname


def TopicRecordNameStrategy(topic=None, is_key=False, schema=None):
"""
Constructs the subject name under which a schema is registered with the Confluent Schema Registry.
TopicRecordNameStrategy returns the topic name appended by the fully-qualified record name.
Compatibility checks are performed against all records of the same name within the same topic.
Like the RecordNameStrategy mixed record types are allowed within a topic.
This strategy is more flexible in that records needn't be complaint across the cluster.
:param str topic: Topic name.
:param schema schema: Parsed Avro schema.
:param is_key: True if used by a key_serializer.
:raises ValueError: If topic and schema are not set.
:returns: The subject name with which to register the schema.
:rtype: str
"""
if not any([topic, schema]):
raise ValueError("Both Topic and Schema must be set when using TopicRecordNameStrategy")
return "-".join([topic, schema.fullname])


class AvroSerializer(object):
"""
Encodes kafka messages as Avro; registering the schema with the Confluent Schema Registry.
:param registry_client CachedSchemaRegistryClient: Instance of CachedSchemaRegistryClient.
:param bool is_key: True if configured as a key_serializer.
:param func(str, bool, schema): Returns the subject name used when registering schemas.
"""

__slots__ = ["registry_client", "codec_cache", "is_key"]
__slots__ = ["registry_client", "codec_cache", "is_key", "subject_strategy"]

def __init__(self, registry_client, is_key=False):
def __init__(self, registry_client, is_key=False, subject_strategy=TopicNameStrategy):
self.registry_client = registry_client
self.codec_cache = {}
self.is_key = is_key
self.subject_strategy = subject_strategy

def __call__(self, topic, record):
"""
Given a parsed avro schema, encode a record for the given topic.
The schema is registered with the subject of 'topic-value'
:param str topic: Topic name
:param GenericAvroRecord record: An object to serialize
:returns: Encoded record with schema ID as bytes
The schema is registered with the subject of 'topic-value'.
:param str topic: Topic name.
:param GenericAvroRecord record: An object to serialize.
:returns: Encoded record with schema ID as bytes.
:rtype: bytes
"""

if record is None:
return None

subject_suffix = '-key' if self.is_key else '-value'
subject = topic + subject_suffix
subject = self.subject_strategy(topic, self.is_key, get_schema(record))

schema_id = self.registry_client.register(subject, get_schema(record))
if not schema_id:
Expand All @@ -115,20 +185,19 @@ def __call__(self, topic, record):
if schema_id not in self.codec_cache:
self.codec_cache[schema_id] = self._get_encoder_func(get_schema(record))

return self._encode_record_with_schema_id(schema_id, record)
return self._encode(schema_id, record)

def _get_encoder_func(self, writer_schema):
if HAS_FAST:
return lambda record, fp: schemaless_writer(fp, writer_schema.to_json(), record)
writer = avro.io.DatumWriter(writer_schema)
return lambda record, fp: writer.write(record, avro.io.BinaryEncoder(fp))

def _encode_record_with_schema_id(self, schema_id, record):
def _encode(self, schema_id, datum):
"""
Encode a record with a given schema id. The record must
be a python dictionary.
Encode a datum with a given schema id.
:param int schema_id: integer ID
:param dict record: An object to serialize
:param object datum: An object to serialize
:param bool is_key: If the record is a key
:param SerializerErr err_type: Error type to raise on serialization exception
:returns: decoder function
Expand All @@ -154,12 +223,18 @@ def _encode_record_with_schema_id(self, schema_id, record):
outf.write(struct.pack('>bI', MAGIC_BYTE, schema_id))

# write the record to the rest of the buffer
writer(record, outf)
writer(datum, outf)
return outf.getvalue()


class AvroDeserializer(object):
"""
Decodes Kafka messages encoded by Confluent Schema Registry compliant Avro Serializers.
:param registry_client CachedSchemaRegistryClient: Instance of CachedSchemaRegistryClient.
:param bool is_key: True if configured as a key_serializer.
:param schema reader_schema: Optional reader schema to be used during deserialization.
"""
__slots__ = ["registry_client", "codec_cache", "is_key", "reader_schema"]

def __init__(self, registry_client, is_key=False, reader_schema=None):
Expand All @@ -168,22 +243,21 @@ def __init__(self, registry_client, is_key=False, reader_schema=None):
self.is_key = is_key
self.reader_schema = reader_schema

def __call__(self, topic, message):
def __call__(self, topic, datum):
"""
Decode a message from kafka that has been encoded for use with
the schema registry.
:param str|bytes or None message: message key or value to be decoded
:returns: Decoded message contents.
Decode a datum from kafka that has been encoded for use with the Confluent Schema Registry.
:param str|bytes or None datum: message key or value to be decoded.
:returns: Decoded message key or value contents.
:rtype GenericAvroRecord:
"""

if message is None:
if datum is None:
return None

if len(message) <= 5:
if len(datum) <= 5:
raise SerializerError("message is too small to decode")

with ContextStringIO(message) as payload:
with ContextStringIO(datum) as payload:
magic, schema_id = struct.unpack('>bI', payload.read(5))
if magic != MAGIC_BYTE:
raise SerializerError("message does not start with magic byte", self.is_key)
Expand Down Expand Up @@ -247,7 +321,7 @@ def decoder(p):
bin_decoder = avro.io.BinaryDecoder(p)
return avro_reader.read(bin_decoder)

if writer_schema.get_prop('type') is 'record':
if writer_schema.type is 'record':
self.codec_cache[schema_id] = record_decoder
else:
self.codec_cache[schema_id] = decoder
Expand Down
12 changes: 4 additions & 8 deletions confluent_kafka/avro/serializer/message_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
import avro.io

from confluent_kafka.avro import ClientError
from confluent_kafka.avro.serializer import (SerializerError,
KeySerializerError,
ValueSerializerError)
from confluent_kafka.avro.serializer import SerializerError

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -96,7 +94,6 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False):
:returns: Encoded record with schema ID as bytes
:rtype: bytes
"""
serialize_err = KeySerializerError if is_key else ValueSerializerError

subject_suffix = ('-key' if is_key else '-value')
# get the latest schema for the subject
Expand All @@ -105,7 +102,7 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False):
schema_id = self.registry_client.register(subject, schema)
if not schema_id:
message = "Unable to retrieve schema id for subject %s" % (subject)
raise serialize_err(message)
raise SerializerError(message, is_key)

# cache writer
self.id_to_writers[schema_id] = self._get_encoder_func(schema)
Expand All @@ -122,7 +119,6 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False):
:returns: decoder function
:rtype: func
"""
serialize_err = KeySerializerError if is_key else ValueSerializerError

# use slow avro
if schema_id not in self.id_to_writers:
Expand All @@ -131,11 +127,11 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False):
try:
schema = self.registry_client.get_by_id(schema_id)
if not schema:
raise serialize_err("Schema does not exist")
raise SerializerError("Schema does not exist", is_key)
self.id_to_writers[schema_id] = self._get_encoder_func(schema)
except ClientError:
exc_type, exc_value, exc_traceback = sys.exc_info()
raise serialize_err(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))
raise SerializerError(repr(traceback.format_exception(exc_type, exc_value, exc_traceback), is_key))

# get the writer
writer = self.id_to_writers[schema_id]
Expand Down
2 changes: 1 addition & 1 deletion confluent_kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Producer(_impl):
:param func key_serializer(topic, key): Converts key to bytes.
**note** serializers are responsible for handling NULL keys
:param func value_serializer(topic, value): Converts value to bytes.
**note** serializers are responsible for handling NULL keys
**note** serializers are responsible for handling NULL values
:param func error_cb(kafka.KafkaError): Callback for generic/global error events.
:param func stats_cb(json_str): Callback for statistics emitted every ``statistics.interval.ms``.
See https://github.com/edenhill/librdkafka/wiki/Statistics” for more information.
Expand Down

0 comments on commit f731b24

Please sign in to comment.