/
decompress.py
180 lines (162 loc) · 7.07 KB
/
decompress.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import ByteString, List, NamedTuple, Optional
from refinery.units import Arg, Unit, RefineryPartialResult
from refinery.lib.types import INF
from .ap import aplib # noqa
from .blz import blz # noqa
from .bz2 import bz2 # noqa
from .jcalg import jcalg # noqa
from .lz import lzma # noqa
from .lz4 import lz4 # noqa
from .lzjb import lzjb # noqa
from .lznt1 import lznt1 # noqa
from .lzo import lzo # noqa
from .szdd import szdd # noqa
from .zl import zl # noqa
from .qlz import qlz # noqa
from .lzf import lzf # noqa
from .lzw import lzw # noqa
class decompress(Unit):
"""
Attempts all available decompression units against the input and returns
the output of the first successful one. If none succeeds, the data is
returned unaltered. The process is heavily biased against LZNT1 decompression
due to a large tendency for LZNT1 false positives.
"""
def __init__(
self,
prepend: Arg.Switch('-P', '--no-prepend', off=True, help=(
'By default, if decompression fails, the unit attempts to prefix '
'the data with all possible values of a single byte and decompress '
'the result. This behavior can be disabled with this flag.')
) = True,
tolerance: Arg.Number('-t', help=(
'Maximum number of bytes to strip from the beginning of the data; '
'The default value is 12.')
) = 12,
max_ratio: Arg('-m', metavar='R', help=(
'To determine whether a decompression algorithm was successful, the '
'ratio of compressed size to decompressed size may at most be as large '
'as this number, a floating point value R; default value is {default}.')
) = 1,
min_ratio: Arg('-n', metavar='R', help=(
'Require that compression ratios must be at least as large as R. This '
'is a "too good to be true" heuristic against algorithms like lznt1 '
'that can produce false positives. The default is {default}.')
) = 0.0001,
):
if min_ratio <= 0:
raise ValueError('The compression factor must be nonnegative.')
super().__init__(
tolerance=tolerance,
prepend=prepend,
min_ratio=min_ratio,
max_ratio=max_ratio
)
self.engines: List[Unit] = [
engine.assemble() for engine in [
szdd, zl, lzma, aplib, qlz, lzf, lzw, jcalg, bz2, blz, lzjb, lz4, lzo, lznt1]
]
for engine in self.engines:
engine.log_detach()
def process(self, data):
data = memoryview(data)
class Decompression(NamedTuple):
engine: Unit
result: Optional[ByteString] = None
cutoff: int = 0
prefix: Optional[int] = None
failed: bool = False
def __str__(self):
status = 'partial' if self.failed else 'success'
prefix = self.prefix
if prefix is not None:
prefix = F'0x{prefix:02X}'
return F'prefix={prefix}, cutoff=0x{self.cutoff:02X}, [{status}] engine={self.engine.name}'
def __len__(self):
return len(self.result)
@property
def ratio(self):
if not self.result:
return INF
return len(data) / len(self.result)
@property
def unmodified(self):
return self.cutoff == 0
return self.prefix is None and self.cutoff == 0
@property
def method(self):
return self.engine.name
if self.args.prepend:
buffer = bytearray(1 + len(data))
buffer[1:] = data
best_only_success: Optional[Decompression] = None
best_with_failure: Optional[Decompression] = None
def decompress(engine: Unit, cutoff: int = 0, prefix: Optional[int] = None):
ingest = data[cutoff:]
failed = True
if prefix is not None:
buffer[0] = prefix
ingest = buffer
if engine.handles(ingest) is False:
return Decompression(engine, None, cutoff, prefix)
try:
result = engine.process(ingest)
except RefineryPartialResult as pr:
result = pr.partial
except Exception:
result = None
else:
failed = False
return Decompression(engine, result, cutoff, prefix, failed)
def update(new: Decompression, best: Optional[Decompression] = None, discard_if_too_good=False) -> Decompression:
ratio = new.ratio
if ratio > self.args.max_ratio:
return best
if ratio < self.args.min_ratio:
return best
prefix = new.prefix
if prefix is not None:
prefix = F'0x{prefix:02X}'
r = 1 if new.unmodified and best and not best.unmodified else 0.95
if not best or len(new) < len(best):
q = 0
else:
q = ratio / best.ratio
if q < r:
if best and discard_if_too_good:
if q < 0.5:
return best
if new.failed:
return best
self.log_info(lambda: F'obtained {ratio * 100:07.4f}% compression ratio [q={q:07.4f}] with: {new!s}')
return new
else:
self.log_debug(F'obtained {ratio * 100:07.4f}% compression ratio [q={q:07.4f}] with: {new!s}')
return best
for engine in self.engines:
self.log_debug(F'attempting engine: {engine.name}')
careful = isinstance(engine, (lznt1, lzf, lzjb))
for t in range(self.args.tolerance):
if best_only_success and careful and t > 0:
break
dc = decompress(engine, t)
if not dc.failed:
best_only_success = update(dc, best_only_success, careful)
else:
best_with_failure = update(dc, best_with_failure, careful)
if self.args.prepend and not best_only_success:
for p in range(0x100):
dc = decompress(engine, 0, p)
if not dc.failed:
best_only_success = update(dc, best_only_success, careful)
else:
best_with_failure = update(dc, best_with_failure, careful)
if best_only_success is not None:
return self.labelled(best_only_success.result, method=best_only_success.method)
if best_with_failure is not None:
self.log_info('the only decompression with result returned only a partial result.')
return self.labelled(best_with_failure.result, method=best_with_failure.method)
self.log_warn('no compression engine worked, returning original data.')
return data