-
Notifications
You must be signed in to change notification settings - Fork 879
/
avro.py
437 lines (347 loc) · 19.7 KB
/
avro.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from io import BytesIO
from json import loads
from struct import pack, unpack
from fastavro import (parse_schema,
schemaless_reader,
schemaless_writer)
from . import (_MAGIC_BYTE,
Schema,
topic_subject_name_strategy)
from confluent_kafka.serialization import (Deserializer,
SerializationError,
Serializer)
class _ContextStringIO(BytesIO):
"""
Wrapper to allow use of StringIO via 'with' constructs.
"""
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
return False
def _schema_loads(schema_str):
"""
Instantiate a Schema instance from a declaration string.
Args:
schema_str (str): Avro Schema declaration.
.. _Schema declaration:
https://avro.apache.org/docs/current/spec.html#schemas
Returns:
Schema: A Schema instance.
"""
schema_str = schema_str.strip()
# canonical form primitive declarations are not supported
if schema_str[0] != "{" and schema_str[0] != "[":
schema_str = '{"type":' + schema_str + '}'
return Schema(schema_str, schema_type='AVRO')
def _resolve_named_schema(schema, schema_registry_client, named_schemas=None):
"""
Resolves named schemas referenced by the provided schema recursively.
:param schema: Schema to resolve named schemas for.
:param schema_registry_client: SchemaRegistryClient to use for retrieval.
:param named_schemas: Dict of named schemas resolved recursively.
:return: named_schemas dict.
"""
if named_schemas is None:
named_schemas = {}
if schema.references is not None:
for ref in schema.references:
referenced_schema = schema_registry_client.get_version(ref.subject, ref.version)
_resolve_named_schema(referenced_schema.schema, schema_registry_client, named_schemas)
parse_schema(loads(referenced_schema.schema.schema_str), named_schemas=named_schemas)
return named_schemas
class AvroSerializer(Serializer):
"""
Serializer that outputs Avro binary encoded data with Confluent Schema Registry framing.
Configuration properties:
+---------------------------+----------+--------------------------------------------------+
| Property Name | Type | Description |
+===========================+==========+==================================================+
| | | If True, automatically register the configured |
| ``auto.register.schemas`` | bool | schema with Confluent Schema Registry if it has |
| | | not previously been associated with the relevant |
| | | subject (determined via subject.name.strategy). |
| | | |
| | | Defaults to True. |
+---------------------------+----------+--------------------------------------------------+
| | | Whether to normalize schemas, which will |
| ``normalize.schemas`` | bool | transform schemas to have a consistent format, |
| | | including ordering properties and references. |
+---------------------------+----------+--------------------------------------------------+
| | | Whether to use the latest subject version for |
| ``use.latest.version`` | bool | serialization. |
| | | |
| | | WARNING: There is no check that the latest |
| | | schema is backwards compatible with the object |
| | | being serialized. |
| | | |
| | | Defaults to False. |
+---------------------------+----------+--------------------------------------------------+
| | | Callable(SerializationContext, str) -> str |
| | | |
| ``subject.name.strategy`` | callable | Defines how Schema Registry subject names are |
| | | constructed. Standard naming strategies are |
| | | defined in the confluent_kafka.schema_registry |
| | | namespace. |
| | | |
| | | Defaults to topic_subject_name_strategy. |
+---------------------------+----------+--------------------------------------------------+
Schemas are registered against subject names in Confluent Schema Registry that
define a scope in which the schemas can be evolved. By default, the subject name
is formed by concatenating the topic name with the message field (key or value)
separated by a hyphen.
i.e. {topic name}-{message field}
Alternative naming strategies may be configured with the property
``subject.name.strategy``.
Supported subject name strategies:
+--------------------------------------+------------------------------+
| Subject Name Strategy | Output Format |
+======================================+==============================+
| topic_subject_name_strategy(default) | {topic name}-{message field} |
+--------------------------------------+------------------------------+
| topic_record_subject_name_strategy | {topic name}-{record name} |
+--------------------------------------+------------------------------+
| record_subject_name_strategy | {record name} |
+--------------------------------------+------------------------------+
See `Subject name strategy <https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy>`_ for additional details.
Note:
Prior to serialization, all values must first be converted to
a dict instance. This may handled manually prior to calling
:py:func:`Producer.produce()` or by registering a `to_dict`
callable with AvroSerializer.
See ``avro_producer.py`` in the examples directory for example usage.
Note:
Tuple notation can be used to determine which branch of an ambiguous union to take.
See `fastavro notation <https://fastavro.readthedocs.io/en/latest/writer.html#using-the-tuple-notation-to-specify-which-branch-of-a-union-to-take>`_
Args:
schema_registry_client (SchemaRegistryClient): Schema Registry client instance.
schema_str (str or Schema):
Avro `Schema Declaration. <https://avro.apache.org/docs/current/spec.html#schemas>`_
Accepts either a string or a :py:class:`Schema` instance. Note that string
definitions cannot reference other schemas. For referencing other schemas,
use a :py:class:`Schema` instance.
to_dict (callable, optional): Callable(object, SerializationContext) -> dict. Converts object to a dict.
conf (dict): AvroSerializer configuration.
""" # noqa: E501
__slots__ = ['_hash', '_auto_register', '_normalize_schemas', '_use_latest_version',
'_known_subjects', '_parsed_schema',
'_registry', '_schema', '_schema_id', '_schema_name',
'_subject_name_func', '_to_dict', '_named_schemas']
_default_conf = {'auto.register.schemas': True,
'normalize.schemas': False,
'use.latest.version': False,
'subject.name.strategy': topic_subject_name_strategy}
def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):
if isinstance(schema_str, str):
schema = _schema_loads(schema_str)
elif isinstance(schema_str, Schema):
schema = schema_str
else:
raise TypeError('You must pass either schema string or schema object')
self._registry = schema_registry_client
self._schema_id = None
self._known_subjects = set()
if to_dict is not None and not callable(to_dict):
raise ValueError("to_dict must be callable with the signature "
"to_dict(object, SerializationContext)->dict")
self._to_dict = to_dict
conf_copy = self._default_conf.copy()
if conf is not None:
conf_copy.update(conf)
self._auto_register = conf_copy.pop('auto.register.schemas')
if not isinstance(self._auto_register, bool):
raise ValueError("auto.register.schemas must be a boolean value")
self._normalize_schemas = conf_copy.pop('normalize.schemas')
if not isinstance(self._normalize_schemas, bool):
raise ValueError("normalize.schemas must be a boolean value")
self._use_latest_version = conf_copy.pop('use.latest.version')
if not isinstance(self._use_latest_version, bool):
raise ValueError("use.latest.version must be a boolean value")
if self._use_latest_version and self._auto_register:
raise ValueError("cannot enable both use.latest.version and auto.register.schemas")
self._subject_name_func = conf_copy.pop('subject.name.strategy')
if not callable(self._subject_name_func):
raise ValueError("subject.name.strategy must be callable")
if len(conf_copy) > 0:
raise ValueError("Unrecognized properties: {}"
.format(", ".join(conf_copy.keys())))
schema_dict = loads(schema.schema_str)
self._named_schemas = _resolve_named_schema(schema, schema_registry_client)
parsed_schema = parse_schema(schema_dict, named_schemas=self._named_schemas)
if isinstance(parsed_schema, list):
# if parsed_schema is a list, we have an Avro union and there
# is no valid schema name. This is fine because the only use of
# schema_name is for supplying the subject name to the registry
# and union types should use topic_subject_name_strategy, which
# just discards the schema name anyway
schema_name = None
else:
# The Avro spec states primitives have a name equal to their type
# i.e. {"type": "string"} has a name of string.
# This function does not comply.
# https://github.com/fastavro/fastavro/issues/415
schema_name = parsed_schema.get("name", schema_dict["type"])
self._schema = schema
self._schema_name = schema_name
self._parsed_schema = parsed_schema
def __call__(self, obj, ctx):
"""
Serializes an object to Avro binary format, prepending it with Confluent
Schema Registry framing.
Args:
obj (object): The object instance to serialize.
ctx (SerializationContext): Metadata pertaining to the serialization operation.
Raises:
SerializerError: If any error occurs serializing obj.
SchemaRegistryError: If there was an error registering the schema with
Schema Registry, or auto.register.schemas is
false and the schema was not registered.
Returns:
bytes: Confluent Schema Registry encoded Avro bytes
"""
if obj is None:
return None
subject = self._subject_name_func(ctx, self._schema_name)
if subject not in self._known_subjects:
if self._use_latest_version:
latest_schema = self._registry.get_latest_version(subject)
self._schema_id = latest_schema.schema_id
else:
# Check to ensure this schema has been registered under subject_name.
if self._auto_register:
# The schema name will always be the same. We can't however register
# a schema without a subject so we set the schema_id here to handle
# the initial registration.
self._schema_id = self._registry.register_schema(subject,
self._schema,
self._normalize_schemas)
else:
registered_schema = self._registry.lookup_schema(subject,
self._schema,
self._normalize_schemas)
self._schema_id = registered_schema.schema_id
self._known_subjects.add(subject)
if self._to_dict is not None:
value = self._to_dict(obj, ctx)
else:
value = obj
with _ContextStringIO() as fo:
# Write the magic byte and schema ID in network byte order (big endian)
fo.write(pack('>bI', _MAGIC_BYTE, self._schema_id))
# write the record to the rest of the buffer
schemaless_writer(fo, self._parsed_schema, value)
return fo.getvalue()
class AvroDeserializer(Deserializer):
"""
Deserializer for Avro binary encoded data with Confluent Schema Registry
framing.
Note:
By default, Avro complex types are returned as dicts. This behavior can
be overriden by registering a callable ``from_dict`` with the deserializer to
convert the dicts to the desired type.
See ``avro_consumer.py`` in the examples directory in the examples
directory for example usage.
Args:
schema_registry_client (SchemaRegistryClient): Confluent Schema Registry
client instance.
schema_str (str, Schema, optional): Avro reader schema declaration Accepts
either a string or a :py:class:`Schema` instance. If not provided, the
writer schema will be used as the reader schema. Note that string
definitions cannot reference other schemas. For referencing other schemas,
use a :py:class:`Schema` instance.
from_dict (callable, optional): Callable(dict, SerializationContext) -> object.
Converts a dict to an instance of some object.
return_record_name (bool): If True, when reading a union of records, the result will
be a tuple where the first value is the name of the record and the second value is
the record itself. Defaults to False.
See Also:
`Apache Avro Schema Declaration <https://avro.apache.org/docs/current/spec.html#schemas>`_
`Apache Avro Schema Resolution <https://avro.apache.org/docs/1.8.2/spec.html#Schema+Resolution>`_
"""
__slots__ = ['_reader_schema', '_registry', '_from_dict', '_writer_schemas', '_return_record_name', '_schema',
'_named_schemas']
def __init__(self, schema_registry_client, schema_str=None, from_dict=None, return_record_name=False):
schema = None
if schema_str is not None:
if isinstance(schema_str, str):
schema = _schema_loads(schema_str)
elif isinstance(schema_str, Schema):
schema = schema_str
else:
raise TypeError('You must pass either schema string or schema object')
self._schema = schema
self._registry = schema_registry_client
self._writer_schemas = {}
if schema:
schema_dict = loads(self._schema.schema_str)
self._named_schemas = _resolve_named_schema(self._schema, schema_registry_client)
self._reader_schema = parse_schema(schema_dict,
named_schemas=self._named_schemas)
else:
self._named_schemas = None
self._reader_schema = None
if from_dict is not None and not callable(from_dict):
raise ValueError("from_dict must be callable with the signature "
"from_dict(SerializationContext, dict) -> object")
self._from_dict = from_dict
self._return_record_name = return_record_name
if not isinstance(self._return_record_name, bool):
raise ValueError("return_record_name must be a boolean value")
def __call__(self, data, ctx):
"""
Deserialize Avro binary encoded data with Confluent Schema Registry framing to
a dict, or object instance according to from_dict, if specified.
Arguments:
data (bytes): bytes
ctx (SerializationContext): Metadata relevant to the serialization
operation.
Raises:
SerializerError: if an error occurs parsing data.
Returns:
object: If data is None, then None. Else, a dict, or object instance according
to from_dict, if specified.
""" # noqa: E501
if data is None:
return None
if len(data) <= 5:
raise SerializationError("Expecting data framing of length 6 bytes or "
"more but total data size is {} bytes. This "
"message was not produced with a Confluent "
"Schema Registry serializer".format(len(data)))
with _ContextStringIO(data) as payload:
magic, schema_id = unpack('>bI', payload.read(5))
if magic != _MAGIC_BYTE:
raise SerializationError("Unexpected magic byte {}. This message "
"was not produced with a Confluent "
"Schema Registry serializer".format(magic))
writer_schema = self._writer_schemas.get(schema_id, None)
if writer_schema is None:
registered_schema = self._registry.get_schema(schema_id)
self._named_schemas = _resolve_named_schema(registered_schema, self._registry)
prepared_schema = _schema_loads(registered_schema.schema_str)
writer_schema = parse_schema(loads(
prepared_schema.schema_str), named_schemas=self._named_schemas)
self._writer_schemas[schema_id] = writer_schema
obj_dict = schemaless_reader(payload,
writer_schema,
self._reader_schema,
self._return_record_name)
if self._from_dict is not None:
return self._from_dict(obj_dict, ctx)
return obj_dict