-
Notifications
You must be signed in to change notification settings - Fork 90
/
replacement_frontend.py
304 lines (255 loc) · 11 KB
/
replacement_frontend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import logging
import numbers
import weakref
from claripy.ast.base import Base
from claripy.ast.bool import BoolV, false
from claripy.ast.bv import BVV
from claripy.backend_manager import backends
from claripy.balancer import Balancer
from claripy.errors import BackendError, ClaripyFrontendError
from .constrained_frontend import ConstrainedFrontend
l = logging.getLogger("claripy.frontends.replacement_frontend")
class ReplacementFrontend(ConstrainedFrontend):
def __init__(
self,
actual_frontend,
allow_symbolic=None,
replacements=None,
replacement_cache=None,
unsafe_replacement=None,
complex_auto_replace=None,
auto_replace=None,
replace_constraints=None,
**kwargs,
):
super().__init__(**kwargs)
self._actual_frontend = actual_frontend
self._allow_symbolic = True if allow_symbolic is None else allow_symbolic
self._auto_replace = True if auto_replace is None else auto_replace
self._complex_auto_replace = False if complex_auto_replace is None else complex_auto_replace
self._replace_constraints = False if replace_constraints is None else replace_constraints
self._unsafe_replacement = False if unsafe_replacement is None else unsafe_replacement
self._replacements = {} if replacements is None else replacements
self._replacement_cache = weakref.WeakKeyDictionary() if replacement_cache is None else replacement_cache
self._validation_frontend = None
def _blank_copy(self, c):
super()._blank_copy(c)
c._actual_frontend = self._actual_frontend.blank_copy()
c._allow_symbolic = self._allow_symbolic
c._auto_replace = self._auto_replace
c._complex_auto_replace = self._complex_auto_replace
c._replace_constraints = self._replace_constraints
c._unsafe_replacement = self._unsafe_replacement
c._replacements = {}
c._replacement_cache = weakref.WeakKeyDictionary()
if self._validation_frontend is not None:
c._validation_frontend = self._validation_frontend.blank_copy()
else:
c._validation_frontend = None
def _copy(self, c):
super()._copy(c)
self._actual_frontend._copy(c._actual_frontend)
if self._validation_frontend is not None:
self._validation_frontend._copy(c._validation_frontend)
c._replacements = dict(self._replacements)
c._replacement_cache = weakref.WeakKeyDictionary(self._replacement_cache)
#
# Replacements
#
def add_replacement(self, old, new, invalidate_cache=True, replace=True, promote=True):
if not isinstance(old, Base):
return
if old is new:
return
if not replace and old.cache_key in self._replacements:
return
if not promote and old.cache_key in self._replacement_cache:
return
if not isinstance(new, Base):
if isinstance(new, bool):
new = BoolV(new)
elif isinstance(new, numbers.Number):
new = BVV(new, old.length)
else:
return
if invalidate_cache:
self._replacements = dict(self._replacements)
self._replacement_cache = weakref.WeakKeyDictionary(self._replacements)
self._replacements[old.cache_key] = new
self._replacement_cache[old.cache_key] = new
def remove_replacements(self, old_entries):
self._replacements = {k: v for k, v in self._replacements.items() if k not in old_entries}
self._replacement_cache = weakref.WeakKeyDictionary(self._replacements)
def clear_replacements(self):
self._replacements = {}
self._replacement_cache = weakref.WeakKeyDictionary(self._replacements)
def _replacement(self, old):
# depressing hack
try:
if not self._replacement_cache:
return old
except RuntimeError:
if not self._replacement_cache:
return old
if not isinstance(old, Base):
return old
try:
return self._replacement_cache[old.cache_key]
except KeyError:
# not found in the cache
new = old.replace_dict(self._replacement_cache)
if new is not old:
self._replacement_cache[old.cache_key] = new
return new
def _add_solve_result(self, e, er, r):
if not self._auto_replace:
return
if not isinstance(e, Base) or not e.symbolic:
return
if er.symbolic:
return
self.add_replacement(e, r, invalidate_cache=False)
#
# Storable support
#
def downsize(self):
self._actual_frontend.downsize()
self._replacement_cache.clear()
def __getstate__(self):
return (
self._allow_symbolic,
self._unsafe_replacement,
self._complex_auto_replace,
self._auto_replace,
self._replace_constraints,
self._replacements,
self._actual_frontend,
self._validation_frontend,
super().__getstate__(),
)
def __setstate__(self, s):
(
self._allow_symbolic,
self._unsafe_replacement,
self._complex_auto_replace,
self._auto_replace,
self._replace_constraints,
self._replacements,
self._actual_frontend,
self._validation_frontend,
base_state,
) = s
super().__setstate__(base_state)
self._replacement_cache = weakref.WeakKeyDictionary(self._replacements)
#
# Replacement solving
#
def _replace_list(self, lst):
return tuple(self._replacement(c) for c in lst)
def eval(self, e, n, extra_constraints=(), exact=None):
er = self._replacement(e)
ecr = self._replace_list(extra_constraints)
r = self._actual_frontend.eval(er, n, extra_constraints=ecr, exact=exact)
if self._unsafe_replacement:
self._add_solve_result(e, er, r[0])
return r
def batch_eval(self, exprs, n, extra_constraints=(), exact=None):
er = self._replace_list(exprs)
ecr = self._replace_list(extra_constraints)
r = self._actual_frontend.batch_eval(er, n, extra_constraints=ecr, exact=exact)
if self._unsafe_replacement:
for i, original in enumerate(exprs):
self._add_solve_result(original, er[i], r[0][i])
return r
def max(self, e, extra_constraints=(), signed=False, exact=None):
er = self._replacement(e)
ecr = self._replace_list(extra_constraints)
r = self._actual_frontend.max(er, extra_constraints=ecr, signed=signed, exact=exact)
if self._unsafe_replacement:
self._add_solve_result(e, er, r)
return r
def min(self, e, extra_constraints=(), signed=False, exact=None):
er = self._replacement(e)
ecr = self._replace_list(extra_constraints)
r = self._actual_frontend.min(er, extra_constraints=ecr, signed=signed, exact=exact)
if self._unsafe_replacement:
self._add_solve_result(e, er, r)
return r
def solution(self, e, v, extra_constraints=(), exact=None):
er = self._replacement(e)
vr = self._replacement(v)
ecr = self._replace_list(extra_constraints)
r = self._actual_frontend.solution(er, vr, extra_constraints=ecr, exact=exact)
if self._unsafe_replacement and r and (not isinstance(vr, Base) or not vr.symbolic):
self._add_solve_result(e, er, vr)
return r
def is_true(self, e, extra_constraints=(), exact=None):
er = self._replacement(e)
ecr = self._replace_list(extra_constraints)
return self._actual_frontend.is_true(er, extra_constraints=ecr, exact=exact)
def is_false(self, e, extra_constraints=(), exact=None):
er = self._replacement(e)
ecr = self._replace_list(extra_constraints)
return self._actual_frontend.is_false(er, extra_constraints=ecr, exact=exact)
def satisfiable(self, extra_constraints=(), exact=None):
ecr = self._replace_list(extra_constraints)
return self._actual_frontend.satisfiable(extra_constraints=ecr, exact=exact)
def _concrete_value(self, e):
c = super()._concrete_value(e)
if c is not None:
return c
cr = self._replacement(e)
for b in backends._eager_backends:
try:
return b.eval(cr, 1)[0]
except BackendError:
pass
return None
def _concrete_constraint(self, e):
c = super()._concrete_value(e)
if c is not None:
return c
# if er.is_false():
# raise UnsatError("Replacement frontend received False constraint after replacement.")
if self._replace_constraints:
er = self._replacement(e)
return super()._concrete_constraint(er)
else:
return super()._concrete_constraint(e)
def add(self, constraints, **kwargs):
if self._auto_replace:
for c in constraints:
# the badass thing here would be to use the *replaced* constraint, but
# we don't currently support chains of replacements, so we'll do a
# less effective flat-replacement with the original constraint
# rc = self._replacement(c)
rc = c
if not isinstance(rc, Base) or not rc.symbolic:
continue
if not self._complex_auto_replace:
if rc.op == "Not":
self.add_replacement(c.args[0], false, replace=False, promote=True, invalidate_cache=True)
elif rc.op == "__eq__" and rc.args[0].symbolic ^ rc.args[1].symbolic:
old, new = rc.args if rc.args[0].symbolic else rc.args[::-1]
self.add_replacement(old, new, replace=False, promote=True, invalidate_cache=True)
else:
satisfiable, replacements = Balancer(
backends.vsa, rc, validation_frontend=self._validation_frontend
).compat_ret
if not satisfiable:
self.add_replacement(rc, false)
for old, new in replacements:
if old.cardinality == 1:
continue
rold = self._replacement(old)
if rold.cardinality == 1:
continue
self.add_replacement(old, rold.intersection(new))
added = super().add(constraints, **kwargs)
cr = self._replace_list(added)
if not self._allow_symbolic and any(c.symbolic for c in cr):
raise ClaripyFrontendError(
"symbolic constraints made it into ReplacementFrontend with allow_symbolic=False"
)
self._actual_frontend.add(cr, **kwargs)
return added