/
lznt1.py
148 lines (139 loc) · 5.22 KB
/
lznt1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import io
import struct
import copy
from refinery.units import Arg, Unit, RefineryPartialResult
class lznt1(Unit):
"""
LZNT1 compression and decompression. This compression algorithm is expected
by the Win32 API routine `RtlDecompressBuffer`, for example.
"""
def _decompress_chunk(self, chunk):
out = B''
while chunk:
flags = chunk[0]
chunk = chunk[1:]
for i in range(8):
if not (flags >> i & 1):
out += chunk[:1]
chunk = chunk[1:]
else:
flag = struct.unpack('<H', chunk[:2])[0]
pos = len(out) - 1
l_mask = 0xFFF
o_shift = 12
while pos >= 0x10:
l_mask >>= 1
o_shift -= 1
pos >>= 1
length = (flag & l_mask) + 3
offset = (flag >> o_shift) + 1
if length >= offset:
tmp = out[-offset:] * (0xFFF // len(out[-offset:]) + 1)
out += tmp[:length]
else:
out += out[-offset:length - offset]
chunk = chunk[2:]
if len(chunk) == 0:
break
return out
def _find(self, src, target, max_len):
result_offset = 0
result_length = 0
for i in range(1, max_len):
offset = src.rfind(target[:i])
if offset == -1:
break
tmp_offset = len(src) - offset
tmp_length = i
if tmp_offset == tmp_length:
tmp = src[offset:] * (0xFFF // len(src[offset:]) + 1)
for j in range(i, max_len + 1):
offset = tmp.rfind(target[:j])
if offset == -1:
break
tmp_length = j
if tmp_length > result_length:
result_offset = tmp_offset
result_length = tmp_length
if result_length < 3:
return 0, 0
return result_offset, result_length
def _compress_chunk(self, chunk):
blob = copy.copy(chunk)
out = B''
pow2 = 0x10
l_mask3 = 0x1002
o_shift = 12
while len(blob) > 0:
bits = 0
tmp = B''
for i in range(8):
bits >>= 1
while pow2 < (len(chunk) - len(blob)):
pow2 <<= 1
l_mask3 = (l_mask3 >> 1) + 1
o_shift -= 1
if len(blob) < l_mask3:
max_len = len(blob)
else:
max_len = l_mask3
offset1, length1 = self._find(
chunk[:len(chunk) - len(blob)], blob, max_len)
# try to find more compressed pattern
offset2, length2 = self._find(
chunk[:len(chunk) - len(blob) + 1], blob[1:], max_len)
if length1 < length2:
length1 = 0
if length1 > 0:
symbol = ((offset1 - 1) << o_shift) | (length1 - 3)
tmp += struct.pack('<H', symbol)
bits |= 0x80 # set the highest bit
blob = blob[length1:]
else:
tmp += blob[:1]
blob = blob[1:]
if len(blob) == 0:
break
out += struct.pack('B', bits >> (7 - i))
out += tmp
return out
def reverse(self, buf):
out = B''
while buf:
chunk = buf[:self.args.chunk_size]
compressed = self._compress_chunk(chunk)
if len(compressed) < len(chunk): # chunk is compressed
flags = 0xB000
header = struct.pack('<H', flags | (len(compressed) - 1))
out += header + compressed
else:
flags = 0x3000
header = struct.pack('<H', flags | (len(chunk) - 1))
out += header + chunk
buf = buf[self.args.chunk_size:]
return out
def process(self, data):
out = io.BytesIO()
offset = 0
while offset < len(data):
try:
header, = struct.unpack('<H', data[offset:offset + 2])
except struct.error as err:
raise RefineryPartialResult(str(err), partial=out.getvalue())
offset += 2
size = (header & 0xFFF) + 1
if size + 1 >= len(data):
raise RefineryPartialResult(
F'chunk header indicates size {size}, but only {len(data)} bytes remain.',
partial=out.getvalue()
)
chunk = data[offset:offset + size]
offset += size
if header & 0x8000:
chunk = self._decompress_chunk(chunk)
out.write(chunk)
return out.getvalue()
def __init__(self, chunk_size: Arg.Number('-c', help='Optionally specify the chunk size for compression, default is 0x1000.') = 0x1000):
super().__init__(chunk_size=chunk_size)