/
structures.py
723 lines (613 loc) · 23.8 KB
/
structures.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Interfaces and classes to read structured data.
"""
from __future__ import annotations
import contextlib
import itertools
import enum
import functools
import io
import re
import struct
import weakref
from typing import List, Union, Tuple, Optional, Iterable, ByteString, TypeVar, Generic, Any, Dict
T = TypeVar('T', bound=Union[bytearray, bytes, memoryview])
UnpackType = Union[int, bool, float, bytes]
def signed(k: int, bits: int):
M = 1 << bits
k = k & (M - 1)
return k - M if k >> (bits - 1) else k
class EOF(EOFError):
def __init__(self, rest: ByteString = B''):
super().__init__('Unexpected end of buffer.')
self.rest = rest
def __bytes__(self):
return bytes(self.rest)
class StreamDetour:
def __init__(self, stream: io.IOBase, offset: Optional[int] = None, whence: int = io.SEEK_SET):
self.stream = stream
self.offset = offset
self.whence = whence
def __enter__(self):
self.cursor = self.stream.tell()
if self.offset is not None:
self.stream.seek(self.offset, self.whence)
return self
def __exit__(self, *args):
self.stream.seek(self.cursor, io.SEEK_SET)
class MemoryFile(Generic[T], io.IOBase):
"""
A thin wrapper around (potentially mutable) byte sequences which gives it the
features of a file-like object.
"""
closed: bool
read_as_bytes: bool
_data: T
_cursor: int
_closed: bool
class SEEK(int, enum.Enum):
CUR = io.SEEK_CUR
END = io.SEEK_END
SET = io.SEEK_SET
def __init__(
self,
data: Optional[T] = None,
read_as_bytes=False,
fileno: Optional[int] = None,
size_limit: Optional[int] = None,
) -> None:
if data is None:
data = bytearray()
elif size_limit is not None and len(data) > size_limit:
raise ValueError('Initial data exceeds size limit')
self._data = data
self._cursor = 0
self._closed = False
self._fileno = fileno
self.read_as_bytes = read_as_bytes
self._size_limit = size_limit
def close(self) -> None:
self._closed = True
@property
def closed(self) -> bool:
return self._closed
def __enter__(self) -> MemoryFile:
return self
def __exit__(self, ex_type, ex_value, trace) -> bool:
return False
def flush(self) -> None:
pass
def isatty(self) -> bool:
return False
def __iter__(self):
return self
def __len__(self):
return len(self._data)
def __next__(self):
line = self.readline()
if not line:
raise StopIteration
return line
def fileno(self) -> int:
if self._fileno is None:
raise OSError
return self._fileno
def readable(self) -> bool:
return not self._closed
def seekable(self) -> bool:
return not self._closed
@property
def eof(self) -> bool:
return self._closed or self._cursor >= len(self._data)
@property
def remaining_bytes(self) -> int:
return len(self._data) - self.tell()
def writable(self) -> bool:
if self._closed:
return False
if isinstance(self._data, memoryview):
return not self._data.readonly
return isinstance(self._data, bytearray)
def read(self, size: int = -1, peek: bool = False) -> T:
beginning = self._cursor
if size is None or size < 0:
end = len(self._data)
else:
end = min(self._cursor + size, len(self._data))
result = self._data[beginning:end]
if self.read_as_bytes and not isinstance(result, bytes):
result = bytes(result)
if not peek:
self._cursor = end
return result
def peek(self, size: int = -1) -> memoryview:
cursor = self._cursor
mv = memoryview(self._data)
if size is None or size < 0:
return mv[cursor:]
return mv[cursor:cursor + size]
def read1(self, size: int = -1, peek: bool = False) -> T:
return self.read(size, peek)
def _find_linebreak(self, beginning: int, end: int) -> int:
if not isinstance(self._data, memoryview):
return self._data.find(B'\n', beginning, end)
for k in range(beginning, end):
if self._data[k] == 0xA: return k
return -1
def readline(self, size: int = -1) -> T:
beginning, end = self._cursor, len(self._data)
if size is not None and size >= 0:
end = beginning + size
p = self._find_linebreak(beginning, end)
self._cursor = end if p < 0 else p + 1
result = self._data[beginning:self._cursor]
if self.read_as_bytes and not isinstance(result, bytes):
result = bytes(result)
return result
def readlines(self, hint: int = -1) -> Iterable[T]:
if hint is None or hint < 0:
yield from self
else:
total = 0
while total < hint:
line = next(self)
total += len(line)
yield line
def readinto1(self, b) -> int:
data = self.read(len(b))
size = len(data)
b[:size] = data
return size
def readinto(self, b) -> int:
return self.readinto1(b)
def tell(self) -> int:
return self._cursor
def seekrel(self, offset: int) -> int:
return self.seek(offset, io.SEEK_CUR)
def seekset(self, offset: int) -> int:
if offset < 0:
return self.seek(offset, io.SEEK_END)
else:
return self.seek(offset, io.SEEK_SET)
def getbuffer(self) -> T:
return self._data
def getvalue(self) -> T:
return self._data
def seek(self, offset: int, whence=io.SEEK_SET) -> int:
if whence == io.SEEK_SET:
if offset < 0:
raise ValueError('no negative offsets allowed for SEEK_SET.')
self._cursor = offset
elif whence == io.SEEK_CUR:
self._cursor += offset
elif whence == io.SEEK_END:
self._cursor = len(self._data) + offset
self._cursor = max(self._cursor, 0)
self._cursor = min(self._cursor, len(self._data))
return self._cursor
def writelines(self, lines: Iterable[ByteString]) -> None:
for line in lines:
self.write(line)
def truncate(self, size=None) -> None:
if size is not None:
if not (0 <= size <= len(self._data)):
raise ValueError('invalid size value')
self._cursor = size
del self._data[self._cursor:]
def write_byte(self, byte: int) -> None:
limit = self._size_limit
cc = self._cursor
nc = cc + 1
if limit and nc > limit:
raise EOF(bytes((byte,)))
try:
if cc < len(self._data):
self._data[cc] = byte
else:
self._data.append(byte)
except Exception as T:
raise OSError(str(T)) from T
else:
self._cursor = nc
def write(self, data: Iterable[int]) -> int:
out = self._data
end = len(out)
beginning = self._cursor
limit = self._size_limit
if limit is None and beginning == end:
out[end:] = data
self._cursor = end = len(out)
return end - beginning
try:
size = len(data)
except Exception:
it = iter(data)
for cursor, b in enumerate(it, beginning):
out[cursor] = b
if cursor >= end - 1:
break
else:
cursor += 1
self._cursor = cursor
return cursor - beginning
if limit is None:
out[end:] = it
else:
out[end:limit] = itertools.islice(it, 0, limit - end)
try:
b = next(it)
except StopIteration:
self._cursor = limit
return limit - beginning
else:
rest = bytearray((b,))
rest[1:] = it
raise EOF(rest)
else:
if limit and size + beginning > limit:
raise EOF(data)
self._cursor += size
try:
self._data[beginning:self._cursor] = data
except Exception as T:
self._cursor = beginning
raise OSError(str(T)) from T
return size
self._cursor = end = len(out)
return end - beginning
def __getitem__(self, slice):
result = self._data[slice]
if self.read_as_bytes and not isinstance(result, bytes):
result = bytes(result)
return result
def replay(self, offset: int, length: int):
if offset not in range(self._cursor + 1):
raise ValueError(F'The supplied delta {offset} is not in the valid range [0,{self._cursor}].')
rep, r = divmod(length, offset)
offset = -offset - len(self) + self._cursor
replay = self._data[offset:offset + r]
if rep > 0:
replay = bytes(self._data[offset:self._cursor]) * rep + replay
self.write(replay)
class order(str, enum.Enum):
big = '>'
little = '<'
class StructReader(MemoryFile[T]):
"""
An extension of a `refinery.lib.structures.MemoryFile` which provides methods to
read structured data.
"""
class Unaligned(RuntimeError):
pass
def __init__(self, data: T, bigendian: bool = False):
super().__init__(data)
self._bbits = 0
self._nbits = 0
self.bigendian = bigendian
def __enter__(self) -> StructReader:
return super().__enter__()
@property
@contextlib.contextmanager
def be(self):
self.bigendian = True
try:
yield self
finally:
self.bigendian = False
@property
def byteorder_format(self) -> str:
return '>' if self.bigendian else '<'
@property
def byteorder_name(self) -> str:
return 'big' if self.bigendian else 'little'
def seek(self, offset, whence=io.SEEK_SET) -> int:
self._bbits = 0
self._nbits = 0
return super().seek(offset, whence)
def read_exactly(self, size: Optional[int] = None, peek: bool = False) -> T:
"""
Read bytes from the underlying stream. Raises a `RuntimeError` when the stream is not currently
byte-aligned, i.e. when `refinery.lib.structures.StructReader.byte_aligned` is `False`. Raises
an exception of type `refinery.lib.structures.EOF` when fewer data is available in the stream than
requested via the `size` parameter. The remaining data can be extracted from the exception.
Use `refinery.lib.structures.StructReader.read_bytes` to read bytes from the stream when it is
not byte-aligned.
"""
if not self.byte_aligned:
raise StructReader.Unaligned('buffer is not byte-aligned')
data = self.read1(size, peek)
if size and len(data) < size:
raise EOF(data)
return data
@property
def byte_aligned(self) -> bool:
"""
This property is `True` if and only if there are currently no bits still waiting in the internal
bit buffer.
"""
return not self._nbits
def byte_align(self, blocksize: int = 1) -> Tuple[int, int]:
"""
This method clears the internal bit buffer and moves the cursor to the next byte. It returns a
tuple containing the size and contents of the bit buffer.
"""
nbits = self._nbits
bbits = self._bbits
self._nbits = 0
self._bbits = 0
mod = self._cursor % blocksize
self.seekrel(mod and blocksize - mod)
return nbits, bbits
def read_integer(self, length: int, peek: bool = False) -> int:
"""
Read `length` many bits from the underlying stream as an integer.
"""
if length < self._nbits:
new_count = self._nbits - length
if self.bigendian:
result = self._bbits >> new_count
if not peek:
self._bbits ^= result << new_count
else:
result = self._bbits & 2 ** length - 1
if not peek:
self._bbits >>= length
if not peek:
self._nbits = new_count
return result
nbits, bbits = self.byte_align()
number_of_missing_bits = length - nbits
bytecount, rest = divmod(number_of_missing_bits, 8)
if rest:
bytecount += 1
rest = 8 - rest
if bytecount == 1:
result, = self.read_exactly(1, peek)
else:
result = int.from_bytes(self.read_exactly(bytecount, peek), self.byteorder_name)
if not nbits and not rest:
return result
if self.bigendian:
rbmask = 2 ** rest - 1 # noqa
excess = result & rbmask # noqa
result >>= rest # noqa
result ^= bbits << number_of_missing_bits # noqa
else:
excess = result >> number_of_missing_bits # noqa
result ^= excess << number_of_missing_bits # noqa
result <<= nbits # noqa
result |= bbits # noqa
assert excess.bit_length() <= rest
if not peek:
self._nbits = rest
self._bbits = excess
return result
def read_bytes(self, size: int, peek: bool = False) -> bytes:
"""
The method reads `size` many bytes from the underlying stream starting at the current bit.
"""
if self.byte_aligned:
data = self.read_exactly(size, peek)
if not isinstance(data, bytes):
data = bytes(data)
return data
else:
return self.read_integer(size * 8, peek).to_bytes(size, self.byteorder_name)
def read_bit(self) -> int:
"""
This function is a shortcut for calling `refinery.lib.structures.StructReader.read_integer` with
an argument of `1`, i.e. this reads the next bit from the stream. The bits of any byte in the stream
are read from least significant to most significant.
"""
return self.read_integer(1)
def read_bits(self, nbits: int) -> Iterable[int]:
"""
This method returns the bits of `refinery.lib.structures.StructReader.read_integer` as an iterable
from least to most significant.
"""
chunk = self.read_integer(nbits)
for k in range(nbits - 1, -1, -1):
yield chunk >> k & 1
def read_flags(self, nbits: int, reverse=False) -> Iterable[bool]:
"""
Identical to `refinery.lib.structures.StructReader.read_bits` with every bit value cast to a boolean.
"""
bits = list(self.read_bits(nbits))
if reverse:
bits.reverse()
for bit in bits:
yield bool(bit)
def read_struct(self, spec: str, unwrap=False, peek=False) -> Union[List[UnpackType], UnpackType]:
"""
Read structured data from the stream in any format supported by the `struct` module. The `format`
argument can be used to override the current byte ordering. If the `unwrap` parameter is `True`, a
single unpacked value will be returned as a scalar, not as a tuple with one element.
"""
if not spec:
raise ValueError('no format specified')
byteorder = spec[:1]
if byteorder in '<!=@>':
spec = spec[1:]
else:
byteorder = self.byteorder_format
data = []
current_cursor = self.tell()
# reserved struct characters: xcbB?hHiIlLqQnNefdspP
for k, part in enumerate(re.split('(\\d*[auwgE])', spec)):
if k % 2 == 1:
count = 1 if len(part) == 1 else int(part[:~0])
part = part[~0]
for _ in range(count):
if part == 'a':
data.append(self.read_c_string())
elif part == 'g':
data.append(self.read_guid())
elif part == 'u':
data.append(self.read_w_string())
elif part == 'w':
data.append(self.read_w_string().decode('utf-16le'))
elif part == 'E':
data.append(self.read_7bit_encoded_int())
continue
else:
part = F'{byteorder}{part}'
data.extend(struct.unpack(part, self.read_bytes(struct.calcsize(part))))
if unwrap and len(data) == 1:
return data[0]
if peek:
self.seekset(current_cursor)
return data
def read_nibble(self, peek: bool = False) -> int:
"""
Calls `refinery.lib.structures.StructReader.read_integer` with an argument of `4`.
"""
return self.read_integer(4, peek)
def u8(self, peek: bool = False) -> int: return self.read_integer(8, peek)
def i8(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)
def u16(self, peek: bool = False) -> int: return self.read_integer(16, peek)
def u32(self, peek: bool = False) -> int: return self.read_integer(32, peek)
def u64(self, peek: bool = False) -> int: return self.read_integer(64, peek)
def i16(self, peek: bool = False) -> int: return signed(self.read_integer(16, peek), 16)
def i32(self, peek: bool = False) -> int: return signed(self.read_integer(32, peek), 32)
def i64(self, peek: bool = False) -> int: return signed(self.read_integer(64, peek), 64)
def f32(self, peek: bool = False) -> float: return self.read_struct('f', unwrap=True, peek=peek)
def f64(self, peek: bool = False) -> float: return self.read_struct('d', unwrap=True, peek=peek)
def read_byte(self, peek: bool = False) -> int: return self.read_integer(8, peek)
def read_char(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)
def read_terminated_array(self, terminator: bytes, alignment: int = 1) -> bytearray:
pos = self.tell()
buf = self.getbuffer()
try:
end = pos - 1
while True:
end = buf.find(terminator, end + 1)
if end < 0 or not (end - pos) % alignment:
break
except AttributeError:
result = bytearray()
while not self.eof:
result.extend(self.read_bytes(alignment))
if result.endswith(terminator):
return result[:-len(terminator)]
self.seek(pos)
raise EOF
else:
data = self.read_exactly(end - pos)
self.seekrel(len(terminator))
return bytearray(data)
def read_guid(self) -> str:
_mode = self.bigendian
self.bigendian = False
try:
a = self.u32()
b = self.u16()
c = self.u16()
d = self.read(2).hex().upper()
e = self.read(6).hex().upper()
except Exception:
raise
else:
return F'{a:08X}-{b:02X}-{c:02X}-{d}-{e}'
finally:
self.bigendian = _mode
def read_c_string(self, encoding=None) -> Union[str, bytearray]:
data = self.read_terminated_array(B'\0')
if encoding is not None:
data = data.decode(encoding)
return data
def read_w_string(self, encoding=None) -> Union[str, bytearray]:
data = self.read_terminated_array(B'\0\0', 2)
if encoding is not None:
data = data.decode(encoding)
return data
def read_length_prefixed_ascii(self, prefix_size: int = 32):
return self.read_length_prefixed(prefix_size, 'latin1')
def read_length_prefixed_utf8(self, prefix_size: int = 32):
return self.read_length_prefixed(prefix_size, 'utf8')
def read_length_prefixed_utf16(self, prefix_size: int = 32, bytecount: bool = False):
block_size = 1 if bytecount else 2
return self.read_length_prefixed(prefix_size, 'utf-16le', block_size)
def read_length_prefixed(self, prefix_size: int = 32, encoding: Optional[str] = None, block_size: int = 1) -> Union[T, str]:
prefix = self.read_integer(prefix_size) * block_size
data = self.read(prefix)
if encoding is not None:
data = data.decode(encoding)
return data
def read_7bit_encoded_int(self, max_bits: int = 0) -> int:
value = 0
for shift in itertools.count(0, step=7):
b = self.read_byte()
value |= (b & 0x7F) << shift
if not b & 0x80:
return value
if shift > max_bits > 0:
raise RuntimeError('Maximum bits were exceeded by encoded integer.')
class StructMeta(type):
"""
A metaclass to facilitate the behavior outlined for `refinery.lib.structures.Struct`.
"""
def __new__(mcls, name, bases, nmspc, parser=StructReader):
return type.__new__(mcls, name, bases, nmspc)
def __init__(cls, name, bases, nmspc, parser=StructReader):
super(StructMeta, cls).__init__(name, bases, nmspc)
original__init__ = cls.__init__
@functools.wraps(original__init__)
def wrapped__init__(self: Struct, reader, *args, **kwargs):
if not isinstance(reader, parser):
if issubclass(parser, reader.__class__):
raise ValueError(
F'A reader of type {reader.__class__.__name__} was passed to {cls.__name__}, '
F'but a {parser.__name__} is required.')
reader = parser(reader)
start = reader.tell()
view = memoryview(reader.getbuffer())
original__init__(self, reader, *args, **kwargs)
self._data = view[start:reader.tell()]
cls.__init__ = wrapped__init__
class Struct(metaclass=StructMeta):
"""
A class to parse structured data. A `refinery.lib.structures.Struct` class can be instantiated
as follows:
foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument `reader`. If
the object `data` is already a `refinery.lib.structures.StructReader`, then it will be passed
as `reader`. Otherwise, the argument will be wrapped in a `refinery.lib.structures.StructReader`.
Additional arguments to the struct are passed through.
"""
_data: Union[memoryview, bytearray]
def __len__(self):
return len(self._data)
def __bytes__(self):
return bytes(self._data)
def get_data(self, decouple=False):
if decouple and isinstance(self._data, memoryview):
self._data = bytearray(self._data)
return self._data
def __init__(self, reader: StructReader, *args, **kwargs):
pass
AttrType = TypeVar('AttrType')
class PerInstanceAttribute(Generic[AttrType]):
def resolve(self, parent, value: Any) -> AttrType:
return value
def __init__(self):
self.__set: Dict[int, Any] = {}
self.__get: Dict[int, AttrType] = {}
def __set__(self, parent: Any, value: Any) -> None:
pid = id(parent)
if pid not in self.__set:
def cleanup(self, pid):
self.__set.pop(pid, None)
self.__get.pop(pid, None)
self.__set[pid] = value
weakref.finalize(parent, cleanup, self, id(parent))
def __get__(self, parent, tp=None) -> AttrType:
pid = id(parent)
if pid not in self.__get:
try:
seed = self.__set[pid]
except KeyError as K:
raise AttributeError from K
self.__get[pid] = self.resolve(parent, seed)
return self.__get[pid]