This repository was archived by the owner on May 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathmultiaddr.py
238 lines (182 loc) · 7.57 KB
/
multiaddr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import collections.abc
import varint
from . import exceptions, protocols
from .transforms import bytes_iter
from .transforms import string_to_bytes
from .transforms import bytes_to_string
__all__ = ("Multiaddr",)
class MultiAddrKeys(collections.abc.KeysView, collections.abc.Sequence):
def __contains__(self, proto):
proto = self._mapping.registry.find(proto)
return collections.abc.Sequence.__contains__(self, proto)
def __getitem__(self, idx):
if idx < 0:
idx = len(self)+idx
for idx2, proto in enumerate(self):
if idx2 == idx:
return proto
raise IndexError("Protocol list index out of range")
__hash__ = collections.abc.KeysView._hash
def __iter__(self):
for _, proto, _, _ in bytes_iter(self._mapping.to_bytes()):
yield proto
class MultiAddrItems(collections.abc.ItemsView, collections.abc.Sequence):
def __contains__(self, item):
proto, value = item
proto = self._mapping.registry.find(proto)
return collections.abc.Sequence.__contains__(self, (proto, value))
def __getitem__(self, idx):
if idx < 0:
idx = len(self)+idx
for idx2, item in enumerate(self):
if idx2 == idx:
return item
raise IndexError("Protocol item list index out of range")
def __iter__(self):
for _, proto, codec, part in bytes_iter(self._mapping.to_bytes()):
if codec.SIZE != 0:
try:
# If we have an address, return it
yield proto, codec.to_string(proto, part)
except Exception as exc:
raise exceptions.BinaryParseError(
str(exc),
self._mapping.to_bytes(),
proto.name,
exc,
) from exc
else:
# We were given something like '/utp', which doesn't have
# an address, so return None
yield proto, None
class MultiAddrValues(collections.abc.ValuesView, collections.abc.Sequence):
__contains__ = collections.abc.Sequence.__contains__
def __getitem__(self, idx):
if idx < 0:
idx = len(self)+idx
for idx2, proto in enumerate(self):
if idx2 == idx:
return proto
raise IndexError("Protocol value list index out of range")
def __iter__(self):
for _, value in MultiAddrItems(self._mapping):
yield value
class Multiaddr(collections.abc.Mapping):
"""Multiaddr is a representation of multiple nested internet addresses.
Multiaddr is a cross-protocol, cross-platform format for representing
internet addresses. It emphasizes explicitness and self-description.
Learn more here: https://multiformats.io/multiaddr/
Multiaddrs have both a binary and string representation.
>>> from multiaddr import Multiaddr
>>> addr = Multiaddr("/ip4/1.2.3.4/tcp/80")
Multiaddr objects are immutable, so `encapsulate` and `decapsulate`
return new objects rather than modify internal state.
"""
__slots__ = ("_bytes", "registry")
def __init__(self, addr, *, registry=protocols.REGISTRY):
"""Instantiate a new Multiaddr.
Args:
addr : A string-encoded or a byte-encoded Multiaddr
"""
self.registry = registry
if isinstance(addr, str):
self._bytes = string_to_bytes(addr)
elif isinstance(addr, bytes):
self._bytes = addr
elif isinstance(addr, Multiaddr):
self._bytes = addr.to_bytes()
else:
raise TypeError("MultiAddr must be bytes, str or another MultiAddr instance")
@classmethod
def join(cls, *addrs):
"""Concatenate the values of the given MultiAddr strings or objects,
encapsulating each successive MultiAddr value with the previous ones."""
return cls(b"".join(map(lambda a: cls(a).to_bytes(), addrs)))
def __eq__(self, other):
"""Checks if two Multiaddr objects are exactly equal."""
return self._bytes == other._bytes
def __str__(self):
"""Return the string representation of this Multiaddr.
May raise a :class:`~multiaddr.exceptions.BinaryParseError` if the
stored MultiAddr binary representation is invalid."""
return bytes_to_string(self._bytes)
def __contains__(self, proto):
return proto in MultiAddrKeys(self)
def __iter__(self):
return iter(MultiAddrKeys(self))
def __len__(self):
return sum(1 for _ in bytes_iter(self.to_bytes()))
def __repr__(self):
return "<Multiaddr %s>" % str(self)
def __hash__(self):
return self._bytes.__hash__()
def to_bytes(self):
"""Returns the byte array representation of this Multiaddr."""
return self._bytes
__bytes__ = to_bytes
def protocols(self):
"""Returns a list of Protocols this Multiaddr includes."""
return MultiAddrKeys(self)
def split(self, maxsplit=-1):
"""Returns the list of individual path components this MultiAddr is made
up of."""
final_split_offset = -1
results = []
for idx, (offset, proto, codec, part_value) in enumerate(bytes_iter(self._bytes)):
# Split at most `maxplit` times
if idx == maxsplit:
final_split_offset = offset
break
# Re-assemble binary MultiAddr representation
part_size = varint.encode(len(part_value)) if codec.SIZE < 0 else b""
part = b"".join((proto.vcode, part_size, part_value))
# Add MultiAddr with the given value
results.append(self.__class__(part))
# Add final item with remainder of MultiAddr if there is anything left
if final_split_offset >= 0:
results.append(self.__class__(self._bytes[final_split_offset:]))
return results
keys = protocols
def items(self):
return MultiAddrItems(self)
def values(self):
return MultiAddrValues(self)
def encapsulate(self, other):
"""Wrap this Multiaddr around another.
For example:
/ip4/1.2.3.4 encapsulate /tcp/80 = /ip4/1.2.3.4/tcp/80
"""
return self.join(self, other)
def decapsulate(self, other):
"""Remove a Multiaddr wrapping.
For example:
/ip4/1.2.3.4/tcp/80 decapsulate /ip4/1.2.3.4 = /tcp/80
"""
s1 = self.to_bytes()
s2 = Multiaddr(other).to_bytes()
try:
idx = s1.rindex(s2)
except ValueError:
# if multiaddr not contained, returns a copy
return Multiaddr(self)
return Multiaddr(s1[:idx])
def value_for_protocol(self, proto):
"""Return the value (if any) following the specified protocol
Returns
-------
Union[object, NoneType]
The parsed protocol value for the given protocol code or ``None``
if the given protocol does not require any value
Raises
------
~multiaddr.exceptions.BinaryParseError
The stored MultiAddr binary representation is invalid
~multiaddr.exceptions.ProtocolLookupError
MultiAddr does not contain any instance of this protocol
"""
proto = self.registry.find(proto)
for proto2, value in self.items():
if proto2 is proto or proto2 == proto:
return value
raise exceptions.ProtocolLookupError(proto, str(self))
__getitem__ = value_for_protocol