-
Notifications
You must be signed in to change notification settings - Fork 56
/
encoder.py
531 lines (456 loc) · 18.6 KB
/
encoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
from __future__ import division
import re
import math
import struct
from collections import OrderedDict, defaultdict
from contextlib import contextmanager
from functools import wraps
from datetime import datetime, date, time, tzinfo
from io import BytesIO
from sys import modules
from .compat import (
iteritems, long, int2bytes, unicode, as_unicode, pack_float16,
unpack_float16)
from .types import (
CBOREncodeTypeError, CBOREncodeValueError, CBORTag, undefined,
CBORSimpleValue, FrozenDict)
def shareable_encoder(func):
"""
Wrap the given encoder function to gracefully handle cyclic data
structures.
If value sharing is enabled, this marks the given value shared in the
datastream on the first call. If the value has already been passed to this
method, a reference marker is instead written to the data stream and the
wrapped function is not called.
If value sharing is disabled, only infinite recursion protection is done.
"""
@wraps(func)
def wrapper(encoder, value):
encoder.encode_shared(func, value)
return wrapper
class CBOREncoder(object):
"""
The CBOREncoder class implements a fully featured `CBOR`_ encoder with
several extensions for handling shared references, big integers, rational
numbers and so on. Typically the class is not used directly, but the
:func:`dump` and :func:`dumps` functions are called to indirectly construct
and use the class.
When the class is constructed manually, the main entry points are
:meth:`encode` and :meth:`encode_to_bytes`.
:param bool datetime_as_timestamp:
set to ``True`` to serialize datetimes as UNIX timestamps (this makes
datetimes more concise on the wire, but loses the timezone information)
:param datetime.tzinfo timezone:
the default timezone to use for serializing naive datetimes; if this is
not specified naive datetimes will throw a :exc:`ValueError` when
encoding is attempted
:param bool value_sharing:
set to ``True`` to allow more efficient serializing of repeated values
and, more importantly, cyclic data structures, at the cost of extra
line overhead
:param default:
a callable that is called by the encoder with two arguments (the
encoder instance and the value being encoded) when no suitable encoder
has been found, and should use the methods on the encoder to encode any
objects it wants to add to the data stream
:param bool canonical:
when True, use "canonical" CBOR representation; this typically involves
sorting maps, sets, etc. into a pre-determined order ensuring that
serializations are comparable without decoding
:param bool date_as_datetime: set to ``True`` to serialize date objects as
datetimes (CBOR tag 0), which was the default behavior in previous
releases (cbor2 <= 4.1.2).
.. _CBOR: https://cbor.io/
"""
__slots__ = (
'datetime_as_timestamp', '_timezone', '_default', 'value_sharing',
'_fp_write', '_shared_containers', '_encoders', '_canonical')
def __init__(self, fp, datetime_as_timestamp=False, timezone=None,
value_sharing=False, default=None, canonical=False,
date_as_datetime=False):
self.fp = fp
self.datetime_as_timestamp = datetime_as_timestamp
self.timezone = timezone
self.value_sharing = value_sharing
self.default = default
self._canonical = canonical
self._shared_containers = {} # indexes used for value sharing
self._encoders = default_encoders.copy()
if canonical:
self._encoders.update(canonical_encoders)
if date_as_datetime:
self._encoders[date] = CBOREncoder.encode_date
def _find_encoder(self, obj_type):
for type_, enc in list(iteritems(self._encoders)):
if type(type_) is tuple:
try:
modname, typename = type_
except (TypeError, ValueError):
raise CBOREncodeValueError(
"invalid deferred encoder type {!r} (must be a "
"2-tuple of module name and type name, e.g. "
"('collections', 'defaultdict'))".format(type_))
imported_type = getattr(modules.get(modname), typename, None)
if imported_type is not None:
del self._encoders[type_]
self._encoders[imported_type] = enc
type_ = imported_type
else: # pragma: nocover
continue
if issubclass(obj_type, type_):
self._encoders[obj_type] = enc
return enc
return None
@property
def fp(self):
return self._fp_write.__self__
@fp.setter
def fp(self, value):
try:
if not callable(value.write):
raise ValueError('fp.write is not callable')
except AttributeError:
raise ValueError('fp object has no write method')
else:
self._fp_write = value.write
@property
def timezone(self):
return self._timezone
@timezone.setter
def timezone(self, value):
if value is None or isinstance(value, tzinfo):
self._timezone = value
else:
raise ValueError('timezone must be None or a tzinfo instance')
@property
def default(self):
return self._default
@default.setter
def default(self, value):
if value is None or callable(value):
self._default = value
else:
raise ValueError('default must be None or a callable')
@property
def canonical(self):
return self._canonical
@contextmanager
def disable_value_sharing(self):
"""
Disable value sharing in the encoder for the duration of the context
block.
"""
old_value_sharing = self.value_sharing
self.value_sharing = False
yield
self.value_sharing = old_value_sharing
def write(self, data):
"""
Write bytes to the data stream.
:param bytes data:
the bytes to write
"""
self._fp_write(data)
def encode(self, obj):
"""
Encode the given object using CBOR.
:param obj:
the object to encode
"""
obj_type = obj.__class__
encoder = (
self._encoders.get(obj_type) or
self._find_encoder(obj_type) or
self._default
)
if not encoder:
raise CBOREncodeTypeError(
'cannot serialize type %s' % obj_type.__name__)
encoder(self, obj)
def encode_to_bytes(self, obj):
"""
Encode the given object to a byte buffer and return its value as bytes.
This method was intended to be used from the ``default`` hook when an
object needs to be encoded separately from the rest but while still
taking advantage of the shared value registry.
"""
with BytesIO() as fp:
old_fp = self.fp
self.fp = fp
self.encode(obj)
self.fp = old_fp
return fp.getvalue()
def encode_shared(self, encoder, value):
value_id = id(value)
try:
index = self._shared_containers[id(value)][1]
except KeyError:
if self.value_sharing:
# Mark the container as shareable
self._shared_containers[value_id] = (
value, len(self._shared_containers)
)
self.encode_length(6, 0x1c)
encoder(self, value)
else:
self._shared_containers[value_id] = (value, None)
try:
encoder(self, value)
finally:
del self._shared_containers[value_id]
else:
if self.value_sharing:
# Generate a reference to the previous index instead of
# encoding this again
self.encode_length(6, 0x1d)
self.encode_int(index)
else:
raise CBOREncodeValueError(
'cyclic data structure detected but value sharing is '
'disabled')
def encode_length(self, major_tag, length):
major_tag <<= 5
if length < 24:
self._fp_write(struct.pack('>B', major_tag | length))
elif length < 256:
self._fp_write(struct.pack('>BB', major_tag | 24, length))
elif length < 65536:
self._fp_write(struct.pack('>BH', major_tag | 25, length))
elif length < 4294967296:
self._fp_write(struct.pack('>BL', major_tag | 26, length))
else:
self._fp_write(struct.pack('>BQ', major_tag | 27, length))
def encode_int(self, value):
# Big integers (2 ** 64 and over)
if value >= 18446744073709551616 or value < -18446744073709551616:
if value >= 0:
major_type = 0x02
else:
major_type = 0x03
value = -value - 1
payload = int2bytes(value)
self.encode_semantic(CBORTag(major_type, payload))
elif value >= 0:
self.encode_length(0, value)
else:
self.encode_length(1, -(value + 1))
def encode_bytestring(self, value):
self.encode_length(2, len(value))
self._fp_write(value)
def encode_bytearray(self, value):
self.encode_bytestring(bytes(value))
def encode_string(self, value):
encoded = value.encode('utf-8')
self.encode_length(3, len(encoded))
self._fp_write(encoded)
@shareable_encoder
def encode_array(self, value):
self.encode_length(4, len(value))
for item in value:
self.encode(item)
@shareable_encoder
def encode_map(self, value):
self.encode_length(5, len(value))
for key, val in value.items():
self.encode(key)
self.encode(val)
def encode_sortable_key(self, value):
"""
Takes a key and calculates the length of its optimal byte
representation, along with the representation itself. This is used as
the sorting key in CBOR's canonical representations.
"""
encoded = self.encode_to_bytes(value)
return len(encoded), encoded
@shareable_encoder
def encode_canonical_map(self, value):
"Reorder keys according to Canonical CBOR specification"
keyed_keys = (
(self.encode_sortable_key(key), key, value)
for key, value in value.items()
)
self.encode_length(5, len(value))
for sortkey, realkey, value in sorted(keyed_keys):
self._fp_write(sortkey[1])
self.encode(value)
def encode_semantic(self, value):
self.encode_length(6, value.tag)
self.encode(value.value)
#
# Semantic decoders (major tag 6)
#
def encode_datetime(self, value):
# Semantic tag 0
if not value.tzinfo:
if self._timezone:
value = value.replace(tzinfo=self._timezone)
else:
raise CBOREncodeValueError(
'naive datetime {!r} encountered and no default timezone '
'has been set'.format(value))
if self.datetime_as_timestamp:
from calendar import timegm
if not value.microsecond:
timestamp = timegm(value.utctimetuple())
else:
timestamp = timegm(value.utctimetuple()) + value.microsecond / 1000000
self.encode_semantic(CBORTag(1, timestamp))
else:
datestring = as_unicode(value.isoformat().replace('+00:00', 'Z'))
self.encode_semantic(CBORTag(0, datestring))
def encode_date(self, value):
value = datetime.combine(value, time()).replace(tzinfo=self._timezone)
self.encode_datetime(value)
def encode_decimal(self, value):
# Semantic tag 4
if value.is_nan():
self._fp_write(b'\xf9\x7e\x00')
elif value.is_infinite():
self._fp_write(b'\xf9\x7c\x00' if value > 0 else b'\xf9\xfc\x00')
else:
dt = value.as_tuple()
sig = 0
for digit in dt.digits:
sig = (sig * 10) + digit
if dt.sign:
sig = -sig
with self.disable_value_sharing():
self.encode_semantic(CBORTag(4, [dt.exponent, sig]))
def encode_rational(self, value):
# Semantic tag 30
with self.disable_value_sharing():
self.encode_semantic(CBORTag(30, [value.numerator, value.denominator]))
def encode_regexp(self, value):
# Semantic tag 35
self.encode_semantic(CBORTag(35, as_unicode(value.pattern)))
def encode_mime(self, value):
# Semantic tag 36
self.encode_semantic(CBORTag(36, as_unicode(value.as_string())))
def encode_uuid(self, value):
# Semantic tag 37
self.encode_semantic(CBORTag(37, value.bytes))
def encode_set(self, value):
# Semantic tag 258
self.encode_semantic(CBORTag(258, tuple(value)))
def encode_canonical_set(self, value):
# Semantic tag 258
values = sorted(
(self.encode_sortable_key(key), key)
for key in value
)
self.encode_semantic(CBORTag(258, [key[1] for key in values]))
def encode_ipaddress(self, value):
# Semantic tag 260
self.encode_semantic(CBORTag(260, value.packed))
def encode_ipnetwork(self, value):
# Semantic tag 261
self.encode_semantic(
CBORTag(261, {value.network_address.packed: value.prefixlen}))
#
# Special encoders (major tag 7)
#
def encode_simple_value(self, value):
if value.value < 20:
self._fp_write(struct.pack('>B', 0xe0 | value.value))
else:
self._fp_write(struct.pack('>BB', 0xf8, value.value))
def encode_float(self, value):
# Handle special values efficiently
if math.isnan(value):
self._fp_write(b'\xf9\x7e\x00')
elif math.isinf(value):
self._fp_write(b'\xf9\x7c\x00' if value > 0 else b'\xf9\xfc\x00')
else:
self._fp_write(struct.pack('>Bd', 0xfb, value))
def encode_minimal_float(self, value):
# Handle special values efficiently
if math.isnan(value):
self._fp_write(b'\xf9\x7e\x00')
elif math.isinf(value):
self._fp_write(b'\xf9\x7c\x00' if value > 0 else b'\xf9\xfc\x00')
else:
# Try each encoding in turn from longest to shortest
encoded = struct.pack('>Bd', 0xfb, value)
for format, tag in [('>Bf', 0xfa), ('>Be', 0xf9)]:
try:
new_encoded = struct.pack(format, tag, value)
# Check if encoding as low-byte float loses precision
if struct.unpack(format, new_encoded)[1] == value:
encoded = new_encoded
else:
break
except struct.error:
# Catch the case where the 'e' format is not supported
new_encoded = pack_float16(value)
if new_encoded and unpack_float16(new_encoded[1:]) == value:
encoded = new_encoded
else:
break
except OverflowError:
break
self._fp_write(encoded)
def encode_boolean(self, value):
self._fp_write(b'\xf5' if value else b'\xf4')
def encode_none(self, value):
self._fp_write(b'\xf6')
def encode_undefined(self, value):
self._fp_write(b'\xf7')
default_encoders = OrderedDict([
(bytes, CBOREncoder.encode_bytestring),
(bytearray, CBOREncoder.encode_bytearray),
(unicode, CBOREncoder.encode_string),
(int, CBOREncoder.encode_int),
(long, CBOREncoder.encode_int),
(float, CBOREncoder.encode_float),
(('decimal', 'Decimal'), CBOREncoder.encode_decimal),
(bool, CBOREncoder.encode_boolean),
(type(None), CBOREncoder.encode_none),
(tuple, CBOREncoder.encode_array),
(list, CBOREncoder.encode_array),
(dict, CBOREncoder.encode_map),
(defaultdict, CBOREncoder.encode_map),
(OrderedDict, CBOREncoder.encode_map),
(FrozenDict, CBOREncoder.encode_map),
(type(undefined), CBOREncoder.encode_undefined),
(datetime, CBOREncoder.encode_datetime),
(type(re.compile('')), CBOREncoder.encode_regexp),
(('fractions', 'Fraction'), CBOREncoder.encode_rational),
(('email.message', 'Message'), CBOREncoder.encode_mime),
(('uuid', 'UUID'), CBOREncoder.encode_uuid),
(('ipaddress', 'IPv4Address'), CBOREncoder.encode_ipaddress),
(('ipaddress', 'IPv6Address'), CBOREncoder.encode_ipaddress),
(('ipaddress', 'IPv4Network'), CBOREncoder.encode_ipnetwork),
(('ipaddress', 'IPv6Network'), CBOREncoder.encode_ipnetwork),
(CBORSimpleValue, CBOREncoder.encode_simple_value),
(CBORTag, CBOREncoder.encode_semantic),
(set, CBOREncoder.encode_set),
(frozenset, CBOREncoder.encode_set),
])
canonical_encoders = OrderedDict([
(float, CBOREncoder.encode_minimal_float),
(dict, CBOREncoder.encode_canonical_map),
(defaultdict, CBOREncoder.encode_canonical_map),
(OrderedDict, CBOREncoder.encode_canonical_map),
(FrozenDict, CBOREncoder.encode_canonical_map),
(set, CBOREncoder.encode_canonical_set),
(frozenset, CBOREncoder.encode_canonical_set),
])
def dumps(obj, **kwargs):
"""
Serialize an object to a bytestring.
:param obj: the object to serialize
:param kwargs: keyword arguments passed to :class:`~.CBOREncoder`
:return: the serialized output
:rtype: bytes
"""
with BytesIO() as fp:
dump(obj, fp, **kwargs)
return fp.getvalue()
def dump(obj, fp, **kwargs):
"""
Serialize an object to a file.
:param obj: the object to serialize
:param fp: a file-like object
:param kwargs: keyword arguments passed to :class:`~.CBOREncoder`
"""
CBOREncoder(fp, **kwargs).encode(obj)