Permalink
Cannot retrieve contributors at this time
#!python | |
# -*- Python -*- | |
import datetime | |
import re | |
import struct | |
import sys | |
_IS_PY3 = sys.version_info[0] >= 3 | |
if _IS_PY3: | |
from io import BytesIO as StringIO | |
else: | |
try: | |
from cStringIO import StringIO | |
except: | |
from StringIO import StringIO | |
CBOR_TYPE_MASK = 0xE0 # top 3 bits | |
CBOR_INFO_BITS = 0x1F # low 5 bits | |
CBOR_UINT = 0x00 | |
CBOR_NEGINT = 0x20 | |
CBOR_BYTES = 0x40 | |
CBOR_TEXT = 0x60 | |
CBOR_ARRAY = 0x80 | |
CBOR_MAP = 0xA0 | |
CBOR_TAG = 0xC0 | |
CBOR_7 = 0xE0 # float and other types | |
CBOR_UINT8_FOLLOWS = 24 # 0x18 | |
CBOR_UINT16_FOLLOWS = 25 # 0x19 | |
CBOR_UINT32_FOLLOWS = 26 # 0x1a | |
CBOR_UINT64_FOLLOWS = 27 # 0x1b | |
CBOR_VAR_FOLLOWS = 31 # 0x1f | |
CBOR_BREAK = 0xFF | |
CBOR_FALSE = (CBOR_7 | 20) | |
CBOR_TRUE = (CBOR_7 | 21) | |
CBOR_NULL = (CBOR_7 | 22) | |
CBOR_UNDEFINED = (CBOR_7 | 23) # js 'undefined' value | |
CBOR_FLOAT16 = (CBOR_7 | 25) | |
CBOR_FLOAT32 = (CBOR_7 | 26) | |
CBOR_FLOAT64 = (CBOR_7 | 27) | |
CBOR_TAG_DATE_STRING = 0 # RFC3339 | |
CBOR_TAG_DATE_ARRAY = 1 # any number type follows, seconds since 1970-01-01T00:00:00 UTC | |
CBOR_TAG_BIGNUM = 2 # big endian byte string follows | |
CBOR_TAG_NEGBIGNUM = 3 # big endian byte string follows | |
CBOR_TAG_DECIMAL = 4 # [ 10^x exponent, number ] | |
CBOR_TAG_BIGFLOAT = 5 # [ 2^x exponent, number ] | |
CBOR_TAG_BASE64URL = 21 | |
CBOR_TAG_BASE64 = 22 | |
CBOR_TAG_BASE16 = 23 | |
CBOR_TAG_CBOR = 24 # following byte string is embedded CBOR data | |
CBOR_TAG_URI = 32 | |
CBOR_TAG_BASE64URL = 33 | |
CBOR_TAG_BASE64 = 34 | |
CBOR_TAG_REGEX = 35 | |
CBOR_TAG_MIME = 36 # following text is MIME message, headers, separators and all | |
CBOR_TAG_CBOR_FILEHEADER = 55799 # can open a file with 0xd9d9f7 | |
_CBOR_TAG_BIGNUM_BYTES = struct.pack('B', CBOR_TAG | CBOR_TAG_BIGNUM) | |
def dumps_int(val): | |
"return bytes representing int val in CBOR" | |
if val >= 0: | |
# CBOR_UINT is 0, so I'm lazy/efficient about not OR-ing it in. | |
if val <= 23: | |
return struct.pack('B', val) | |
if val <= 0x0ff: | |
return struct.pack('BB', CBOR_UINT8_FOLLOWS, val) | |
if val <= 0x0ffff: | |
return struct.pack('!BH', CBOR_UINT16_FOLLOWS, val) | |
if val <= 0x0ffffffff: | |
return struct.pack('!BI', CBOR_UINT32_FOLLOWS, val) | |
if val <= 0x0ffffffffffffffff: | |
return struct.pack('!BQ', CBOR_UINT64_FOLLOWS, val) | |
outb = _dumps_bignum_to_bytearray(val) | |
return _CBOR_TAG_BIGNUM_BYTES + _encode_type_num(CBOR_BYTES, len(outb)) + outb | |
val = -1 - val | |
return _encode_type_num(CBOR_NEGINT, val) | |
if _IS_PY3: | |
def _dumps_bignum_to_bytearray(val): | |
out = [] | |
while val > 0: | |
out.insert(0, val & 0x0ff) | |
val = val >> 8 | |
return bytes(out) | |
else: | |
def _dumps_bignum_to_bytearray(val): | |
out = [] | |
while val > 0: | |
out.insert(0, chr(val & 0x0ff)) | |
val = val >> 8 | |
return b''.join(out) | |
def dumps_float(val): | |
return struct.pack("!Bd", CBOR_FLOAT64, val) | |
_CBOR_TAG_NEGBIGNUM_BYTES = struct.pack('B', CBOR_TAG | CBOR_TAG_NEGBIGNUM) | |
def _encode_type_num(cbor_type, val): | |
"""For some CBOR primary type [0..7] and an auxiliary unsigned number, return CBOR encoded bytes""" | |
assert val >= 0 | |
if val <= 23: | |
return struct.pack('B', cbor_type | val) | |
if val <= 0x0ff: | |
return struct.pack('BB', cbor_type | CBOR_UINT8_FOLLOWS, val) | |
if val <= 0x0ffff: | |
return struct.pack('!BH', cbor_type | CBOR_UINT16_FOLLOWS, val) | |
if val <= 0x0ffffffff: | |
return struct.pack('!BI', cbor_type | CBOR_UINT32_FOLLOWS, val) | |
if (((cbor_type == CBOR_NEGINT) and (val <= 0x07fffffffffffffff)) or | |
((cbor_type != CBOR_NEGINT) and (val <= 0x0ffffffffffffffff))): | |
return struct.pack('!BQ', cbor_type | CBOR_UINT64_FOLLOWS, val) | |
if cbor_type != CBOR_NEGINT: | |
raise Exception("value too big for CBOR unsigned number: {0!r}".format(val)) | |
outb = _dumps_bignum_to_bytearray(val) | |
return _CBOR_TAG_NEGBIGNUM_BYTES + _encode_type_num(CBOR_BYTES, len(outb)) + outb | |
if _IS_PY3: | |
def _is_unicode(val): | |
return isinstance(val, str) | |
else: | |
def _is_unicode(val): | |
return isinstance(val, unicode) | |
def dumps_string(val, is_text=None, is_bytes=None): | |
if _is_unicode(val): | |
val = val.encode('utf8') | |
is_text = True | |
is_bytes = False | |
if (is_bytes) or not (is_text == True): | |
return _encode_type_num(CBOR_BYTES, len(val)) + val | |
return _encode_type_num(CBOR_TEXT, len(val)) + val | |
def dumps_array(arr, sort_keys=False): | |
head = _encode_type_num(CBOR_ARRAY, len(arr)) | |
parts = [dumps(x, sort_keys=sort_keys) for x in arr] | |
return head + b''.join(parts) | |
if _IS_PY3: | |
def dumps_dict(d, sort_keys=False): | |
head = _encode_type_num(CBOR_MAP, len(d)) | |
parts = [head] | |
if sort_keys: | |
for k in sorted(d.keys()): | |
v = d[k] | |
parts.append(dumps(k, sort_keys=sort_keys)) | |
parts.append(dumps(v, sort_keys=sort_keys)) | |
else: | |
for k,v in d.items(): | |
parts.append(dumps(k, sort_keys=sort_keys)) | |
parts.append(dumps(v, sort_keys=sort_keys)) | |
return b''.join(parts) | |
else: | |
def dumps_dict(d, sort_keys=False): | |
head = _encode_type_num(CBOR_MAP, len(d)) | |
parts = [head] | |
if sort_keys: | |
for k in sorted(d.iterkeys()): | |
v = d[k] | |
parts.append(dumps(k, sort_keys=sort_keys)) | |
parts.append(dumps(v, sort_keys=sort_keys)) | |
else: | |
for k,v in d.iteritems(): | |
parts.append(dumps(k, sort_keys=sort_keys)) | |
parts.append(dumps(v, sort_keys=sort_keys)) | |
return b''.join(parts) | |
def dumps_bool(b): | |
if b: | |
return struct.pack('B', CBOR_TRUE) | |
return struct.pack('B', CBOR_FALSE) | |
def dumps_tag(t, sort_keys=False): | |
return _encode_type_num(CBOR_TAG, t.tag) + dumps(t.value, sort_keys=sort_keys) | |
if _IS_PY3: | |
def _is_stringish(x): | |
return isinstance(x, (str, bytes)) | |
def _is_intish(x): | |
return isinstance(x, int) | |
else: | |
def _is_stringish(x): | |
return isinstance(x, (str, basestring, bytes, unicode)) | |
def _is_intish(x): | |
return isinstance(x, (int, long)) | |
def dumps(ob, sort_keys=False): | |
if ob is None: | |
return struct.pack('B', CBOR_NULL) | |
if isinstance(ob, bool): | |
return dumps_bool(ob) | |
if _is_stringish(ob): | |
return dumps_string(ob) | |
if isinstance(ob, (list, tuple)): | |
return dumps_array(ob, sort_keys=sort_keys) | |
# TODO: accept other enumerables and emit a variable length array | |
if isinstance(ob, dict): | |
return dumps_dict(ob, sort_keys=sort_keys) | |
if isinstance(ob, float): | |
return dumps_float(ob) | |
if _is_intish(ob): | |
return dumps_int(ob) | |
if isinstance(ob, Tag): | |
return dumps_tag(ob, sort_keys=sort_keys) | |
raise Exception("don't know how to cbor serialize object of type %s", type(ob)) | |
# same basic signature as json.dump, but with no options (yet) | |
def dump(obj, fp, sort_keys=False): | |
""" | |
obj: Python object to serialize | |
fp: file-like object capable of .write(bytes) | |
""" | |
# this is kinda lame, but probably not inefficient for non-huge objects | |
# TODO: .write() to fp as we go as each inner object is serialized | |
blob = dumps(obj, sort_keys=sort_keys) | |
fp.write(blob) | |
class Tag(object): | |
def __init__(self, tag=None, value=None): | |
self.tag = tag | |
self.value = value | |
def __repr__(self): | |
return "Tag({0!r}, {1!r})".format(self.tag, self.value) | |
def __eq__(self, other): | |
if not isinstance(other, Tag): | |
return False | |
return (self.tag == other.tag) and (self.value == other.value) | |
def loads(data): | |
""" | |
Parse CBOR bytes and return Python objects. | |
""" | |
if data is None: | |
raise ValueError("got None for buffer to decode in loads") | |
fp = StringIO(data) | |
return _loads(fp)[0] | |
def load(fp): | |
""" | |
Parse and return object from fp, a file-like object supporting .read(n) | |
""" | |
return _loads(fp)[0] | |
_MAX_DEPTH = 100 | |
def _tag_aux(fp, tb): | |
bytes_read = 1 | |
tag = tb & CBOR_TYPE_MASK | |
tag_aux = tb & CBOR_INFO_BITS | |
if tag_aux <= 23: | |
aux = tag_aux | |
elif tag_aux == CBOR_UINT8_FOLLOWS: | |
data = fp.read(1) | |
aux = struct.unpack_from("!B", data, 0)[0] | |
bytes_read += 1 | |
elif tag_aux == CBOR_UINT16_FOLLOWS: | |
data = fp.read(2) | |
aux = struct.unpack_from("!H", data, 0)[0] | |
bytes_read += 2 | |
elif tag_aux == CBOR_UINT32_FOLLOWS: | |
data = fp.read(4) | |
aux = struct.unpack_from("!I", data, 0)[0] | |
bytes_read += 4 | |
elif tag_aux == CBOR_UINT64_FOLLOWS: | |
data = fp.read(8) | |
aux = struct.unpack_from("!Q", data, 0)[0] | |
bytes_read += 8 | |
else: | |
assert tag_aux == CBOR_VAR_FOLLOWS, "bogus tag {0:02x}".format(tb) | |
aux = None | |
return tag, tag_aux, aux, bytes_read | |
def _read_byte(fp): | |
tb = fp.read(1) | |
if len(tb) == 0: | |
# I guess not all file-like objects do this | |
raise EOFError() | |
return ord(tb) | |
def _loads_var_array(fp, limit, depth, returntags, bytes_read): | |
ob = [] | |
tb = _read_byte(fp) | |
while tb != CBOR_BREAK: | |
(subob, sub_len) = _loads_tb(fp, tb, limit, depth, returntags) | |
bytes_read += 1 + sub_len | |
ob.append(subob) | |
tb = _read_byte(fp) | |
return (ob, bytes_read + 1) | |
def _loads_var_map(fp, limit, depth, returntags, bytes_read): | |
ob = {} | |
tb = _read_byte(fp) | |
while tb != CBOR_BREAK: | |
(subk, sub_len) = _loads_tb(fp, tb, limit, depth, returntags) | |
bytes_read += 1 + sub_len | |
(subv, sub_len) = _loads(fp, limit, depth, returntags) | |
bytes_read += sub_len | |
ob[subk] = subv | |
tb = _read_byte(fp) | |
return (ob, bytes_read + 1) | |
if _IS_PY3: | |
def _loads_array(fp, limit, depth, returntags, aux, bytes_read): | |
ob = [] | |
for i in range(aux): | |
subob, subpos = _loads(fp) | |
bytes_read += subpos | |
ob.append(subob) | |
return ob, bytes_read | |
def _loads_map(fp, limit, depth, returntags, aux, bytes_read): | |
ob = {} | |
for i in range(aux): | |
subk, subpos = _loads(fp) | |
bytes_read += subpos | |
subv, subpos = _loads(fp) | |
bytes_read += subpos | |
ob[subk] = subv | |
return ob, bytes_read | |
else: | |
def _loads_array(fp, limit, depth, returntags, aux, bytes_read): | |
ob = [] | |
for i in xrange(aux): | |
subob, subpos = _loads(fp) | |
bytes_read += subpos | |
ob.append(subob) | |
return ob, bytes_read | |
def _loads_map(fp, limit, depth, returntags, aux, bytes_read): | |
ob = {} | |
for i in xrange(aux): | |
subk, subpos = _loads(fp) | |
bytes_read += subpos | |
subv, subpos = _loads(fp) | |
bytes_read += subpos | |
ob[subk] = subv | |
return ob, bytes_read | |
def _loads(fp, limit=None, depth=0, returntags=False): | |
"return (object, bytes read)" | |
if depth > _MAX_DEPTH: | |
raise Exception("hit CBOR loads recursion depth limit") | |
tb = _read_byte(fp) | |
return _loads_tb(fp, tb, limit, depth, returntags) | |
def _loads_tb(fp, tb, limit=None, depth=0, returntags=False): | |
# Some special cases of CBOR_7 best handled by special struct.unpack logic here | |
if tb == CBOR_FLOAT16: | |
data = fp.read(2) | |
hibyte, lowbyte = struct.unpack_from("BB", data, 0) | |
exp = (hibyte >> 2) & 0x1F | |
mant = ((hibyte & 0x03) << 8) | lowbyte | |
if exp == 0: | |
val = mant * (2.0 ** -24) | |
elif exp == 31: | |
if mant == 0: | |
val = float('Inf') | |
else: | |
val = float('NaN') | |
else: | |
val = (mant + 1024.0) * (2 ** (exp - 25)) | |
if hibyte & 0x80: | |
val = -1.0 * val | |
return (val, 3) | |
elif tb == CBOR_FLOAT32: | |
data = fp.read(4) | |
pf = struct.unpack_from("!f", data, 0) | |
return (pf[0], 5) | |
elif tb == CBOR_FLOAT64: | |
data = fp.read(8) | |
pf = struct.unpack_from("!d", data, 0) | |
return (pf[0], 9) | |
tag, tag_aux, aux, bytes_read = _tag_aux(fp, tb) | |
if tag == CBOR_UINT: | |
return (aux, bytes_read) | |
elif tag == CBOR_NEGINT: | |
return (-1 - aux, bytes_read) | |
elif tag == CBOR_BYTES: | |
ob, subpos = loads_bytes(fp, aux) | |
return (ob, bytes_read + subpos) | |
elif tag == CBOR_TEXT: | |
raw, subpos = loads_bytes(fp, aux, btag=CBOR_TEXT) | |
ob = raw.decode('utf8') | |
return (ob, bytes_read + subpos) | |
elif tag == CBOR_ARRAY: | |
if aux is None: | |
return _loads_var_array(fp, limit, depth, returntags, bytes_read) | |
return _loads_array(fp, limit, depth, returntags, aux, bytes_read) | |
elif tag == CBOR_MAP: | |
if aux is None: | |
return _loads_var_map(fp, limit, depth, returntags, bytes_read) | |
return _loads_map(fp, limit, depth, returntags, aux, bytes_read) | |
elif tag == CBOR_TAG: | |
ob, subpos = _loads(fp) | |
bytes_read += subpos | |
if returntags: | |
# Don't interpret the tag, return it and the tagged object. | |
ob = Tag(aux, ob) | |
else: | |
# attempt to interpet the tag and the value into a Python object. | |
ob = tagify(ob, aux) | |
return ob, bytes_read | |
elif tag == CBOR_7: | |
if tb == CBOR_TRUE: | |
return (True, bytes_read) | |
if tb == CBOR_FALSE: | |
return (False, bytes_read) | |
if tb == CBOR_NULL: | |
return (None, bytes_read) | |
if tb == CBOR_UNDEFINED: | |
return (None, bytes_read) | |
raise ValueError("unknown cbor tag 7 byte: {:02x}".format(tb)) | |
def loads_bytes(fp, aux, btag=CBOR_BYTES): | |
# TODO: limit to some maximum number of chunks and some maximum total bytes | |
if aux is not None: | |
# simple case | |
ob = fp.read(aux) | |
return (ob, aux) | |
# read chunks of bytes | |
chunklist = [] | |
total_bytes_read = 0 | |
while True: | |
tb = fp.read(1)[0] | |
if not _IS_PY3: | |
tb = ord(tb) | |
if tb == CBOR_BREAK: | |
total_bytes_read += 1 | |
break | |
tag, tag_aux, aux, bytes_read = _tag_aux(fp, tb) | |
assert tag == btag, 'variable length value contains unexpected component' | |
ob = fp.read(aux) | |
chunklist.append(ob) | |
total_bytes_read += bytes_read + aux | |
return (b''.join(chunklist), total_bytes_read) | |
if _IS_PY3: | |
def _bytes_to_biguint(bs): | |
out = 0 | |
for ch in bs: | |
out = out << 8 | |
out = out | ch | |
return out | |
else: | |
def _bytes_to_biguint(bs): | |
out = 0 | |
for ch in bs: | |
out = out << 8 | |
out = out | ord(ch) | |
return out | |
def tagify(ob, aux): | |
# TODO: make this extensible? | |
# cbor.register_tag_handler(tagnumber, tag_handler) | |
# where tag_handler takes (tagnumber, tagged_object) | |
if aux == CBOR_TAG_DATE_STRING: | |
# TODO: parse RFC3339 date string | |
pass | |
if aux == CBOR_TAG_DATE_ARRAY: | |
return datetime.datetime.utcfromtimestamp(ob) | |
if aux == CBOR_TAG_BIGNUM: | |
return _bytes_to_biguint(ob) | |
if aux == CBOR_TAG_NEGBIGNUM: | |
return -1 - _bytes_to_biguint(ob) | |
if aux == CBOR_TAG_REGEX: | |
# Is this actually a good idea? Should we just return the tag and the raw value to the user somehow? | |
return re.compile(ob) | |
return Tag(aux, ob) |