/
codec.py
212 lines (179 loc) · 7.89 KB
/
codec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import collections
import sys
from .exceptions import EncodingError, DecodingError
from .utils import (Atomic, str_to_bytes, is_integer, ascii_chr, safe_ord, big_endian_to_int,
int_to_big_endian)
from .sedes.binary import Binary as BinaryClass
from .sedes import big_endian_int, binary
from .sedes.lists import List, is_sedes
if sys.version_info.major == 2:
from itertools import imap as map
def encode(obj, sedes=None, infer_serializer=True):
"""Encode a Python object in RLP format.
By default, the object is serialized in a suitable way first (using
:func:`rlp.infer_sedes`) and then encoded. Serialization can be
explicitly suppressed by setting `infer_serializer` to ``False`` and not
passing an alternative as `sedes`.
:param sedes: an object implementing a function ``serialize(obj)`` which
will be used to serialize ``obj`` before encoding, or
``None`` to use the infered one (if any)
:param infer_serializer: if ``True`` an appropriate serializer will be
selected using :func:`rlp.infer_sedes` to
serialize `obj` before encoding
:returns: the RLP encoded item
:raises: :exc:`rlp.EncodingError` in the rather unlikely case that the item
is too big to encode (will not happen)
:raises: :exc:`rlp.SerializationError` if the serialization fails
"""
if sedes:
item = sedes.serialize(obj)
elif infer_serializer:
item = infer_sedes(obj).serialize(obj)
else:
item = obj
return encode_raw(item)
class RLPData(str):
"wraper to mark already rlp serialized data"
pass
def encode_raw(item):
"""RLP encode (a nested sequence of) :class:`Atomic`s."""
if isinstance(item, RLPData):
return item
elif isinstance(item, Atomic):
if len(item) == 1 and safe_ord(item[0]) < 128:
return str_to_bytes(item)
payload = str_to_bytes(item)
prefix_offset = 128 # string
elif isinstance(item, collections.Sequence):
payload = b''.join(map(encode_raw, item))
prefix_offset = 192 # list
else:
msg = 'Cannot encode object of type {0}'.format(type(item).__name__)
raise EncodingError(msg, item)
try:
prefix = length_prefix(len(payload), prefix_offset)
except ValueError:
raise EncodingError('Item too big to encode', item)
return prefix + payload
def length_prefix(length, offset):
"""Construct the prefix to lists or strings denoting their length.
:param length: the length of the item in bytes
:param offset: ``0x80`` when encoding raw bytes, ``0xc0`` when encoding a
list
"""
if length < 56:
return ascii_chr(offset + length)
elif length < 256**8:
length_string = int_to_big_endian(length)
return ascii_chr(offset + 56 - 1 + len(length_string)) + length_string
else:
raise ValueError('Length greater than 256**8')
def consume_length_prefix(rlp, start):
"""Read a length prefix from an RLP string.
:param rlp: the rlp string to read from
:param start: the position at which to start reading
:returns: a tuple ``(type, length, end)``, where ``type`` is either ``str``
or ``list`` depending on the type of the following payload,
``length`` is the length of the payload in bytes, and ``end`` is
the position of the first payload byte in the rlp string
"""
b0 = safe_ord(rlp[start])
if b0 < 128: # single byte
return (str, 1, start)
elif b0 < 128 + 56: # short string
if b0 - 128 == 1 and safe_ord(rlp[start + 1]) < 128:
raise DecodingError('Encoded as short string although single byte was possible', rlp)
return (str, b0 - 128, start + 1)
elif b0 < 192: # long string
ll = b0 - 128 - 56 + 1
if rlp[start + 1:start + 2] == b'\x00':
raise DecodingError('Length starts with zero bytes', rlp)
l = big_endian_to_int(rlp[start + 1:start + 1 + ll])
return (str, l, start + 1 + ll)
elif b0 < 192 + 56: # short list
return (list, b0 - 192, start + 1)
else: # long list
ll = b0 - 192 - 56 + 1
if rlp[start + 1:start + 2] == b'\x00':
raise DecodingError('Length starts with zero bytes', rlp)
l = big_endian_to_int(rlp[start + 1:start + 1 + ll])
if l < 56:
raise DecodingError('Long list prefix used for short list', rlp)
return (list, l, start + 1 + ll)
def consume_payload(rlp, start, type_, length):
"""Read the payload of an item from an RLP string.
:param rlp: the rlp string to read from
:param type_: the type of the payload (``str`` or ``list``)
:param start: the position at which to start reading
:param length: the length of the payload in bytes
:returns: a tuple ``(item, end)``, where ``item`` is the read item and
``end`` is the position of the first unprocessed byte
"""
if type_ == str:
return (rlp[start:start + length], start + length)
elif type_ == list:
items = []
next_item_start = start
end = next_item_start + length
while next_item_start < end:
item, next_item_start = consume_item(rlp, next_item_start)
items.append(item)
if next_item_start > end:
raise DecodingError('List length prefix announced a too small '
'length', rlp)
return (items, next_item_start)
else:
raise TypeError('Type must be either list or str')
def consume_item(rlp, start):
"""Read an item from an RLP string.
:param rlp: the rlp string to read from
:param start: the position at which to start reading
:returns: a tuple ``(item, end)`` where ``item`` is the read item and
``end`` is the position of the first unprocessed byte
"""
t, l, s = consume_length_prefix(rlp, start)
return consume_payload(rlp, s, t, l)
def decode(rlp, sedes=None, strict=True, **kwargs):
"""Decode an RLP encoded object.
:param sedes: an object implementing a function ``deserialize(code)`` which
will be applied after decoding, or ``None`` if no
deserialization should be performed
:param \*\*kwargs: additional keyword arguments that will be passed to the
deserializer
:param strict: if false inputs that are longer than necessary don't cause
an exception
:returns: the decoded and maybe deserialized Python object
:raises: :exc:`rlp.DecodingError` if the input string does not end after
the root item and `strict` is true
:raises: :exc:`rlp.DeserializationError` if the deserialization fails
"""
rlp = str_to_bytes(rlp)
try:
item, end = consume_item(rlp, 0)
except IndexError:
raise DecodingError('RLP string to short', rlp)
if end != len(rlp) and strict:
msg = 'RLP string ends with {} superfluous bytes'.format(len(rlp) - end)
raise DecodingError(msg, rlp)
if sedes:
return sedes.deserialize(item, **kwargs)
else:
return item
def infer_sedes(obj):
"""Try to find a sedes objects suitable for a given Python object.
The sedes objects considered are `obj`'s class, `big_endian_int` and
`binary`. If `obj` is a sequence, a :class:`rlp.sedes.List` will be
constructed recursively.
:param obj: the python object for which to find a sedes object
:raises: :exc:`TypeError` if no appropriate sedes could be found
"""
if is_sedes(obj.__class__):
return obj.__class__
if is_integer(obj) and obj >= 0:
return big_endian_int
if BinaryClass.is_valid_type(obj):
return binary
if isinstance(obj, collections.Sequence):
return List(map(infer_sedes, obj))
msg = 'Did not find sedes handling type {}'.format(type(obj).__name__)
raise TypeError(msg)