-
Notifications
You must be signed in to change notification settings - Fork 1k
/
engine.py
642 lines (551 loc) · 26.5 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
from cachetools import LRUCache
import pytcg
import claripy
from archinfo import ArchARM
from ... import sim_options as o
from ...state_plugins.inspect import BP_AFTER, BP_BEFORE
from ...state_plugins.sim_action import SimActionExit, SimActionObject
from ...errors import (SimError, SimIRSBError, SimSolverError, SimMemoryAddressError, SimReliftException,
UnsupportedDirtyError, SimTranslationError, SimEngineError, SimSegfaultError,
SimMemoryError, SimIRSBNoDecodeError, AngrAssemblyError)
from ..engine import SimEngine
#from .statements import translate_stmt
# from .expressions import translate_expr
import logging
l = logging.getLogger("angr.engines.tcg.engine")
l.setLevel(logging.DEBUG)
#pylint: disable=arguments-differ
VEX_IRSB_MAX_SIZE = 400
VEX_IRSB_MAX_INST = 99
class SimEngineTCG(SimEngine):
"""
Execution engine based on TCG, Qemu's IR.
"""
def __init__(self, project=None,
stop_points=None,
use_cache=None,
cache_size=50000,
default_opt_level=1,
support_selfmodifying_code=None,
single_step=False):
super(SimEngineTCG, self).__init__(project)
self._stop_points = stop_points
self._use_cache = use_cache
self._default_opt_level = default_opt_level
self._support_selfmodifying_code = support_selfmodifying_code
self._single_step = single_step
self._cache_size = cache_size
if self._use_cache is None:
if project is not None:
self._use_cache = project._translation_cache
else:
self._use_cache = False
if self._support_selfmodifying_code is None:
if project is not None:
self._support_selfmodifying_code = project._support_selfmodifying_code
else:
self._support_selfmodifying_code = False
# block cache
self._block_cache = None
self._block_cache_hits = 0
self._block_cache_misses = 0
self._initialize_block_cache()
def is_stop_point(self, addr):
if self.project is not None and addr in self.project._sim_procedures:
return True
elif self._stop_points is not None and addr in self._stop_points:
return True
return False
def _initialize_block_cache(self):
self._block_cache = LRUCache(maxsize=self._cache_size)
self._block_cache_hits = 0
self._block_cache_misses = 0
def process(self, state,
irsb=None,
skip_stmts=0,
last_stmt=99999999,
whitelist=None,
inline=False,
force_addr=None,
insn_bytes=None,
size=None,
num_inst=None,
traceflags=0,
thumb=False,
opt_level=None,
**kwargs):
"""
:param state: The state with which to execute
:param irsb: The PyVEX IRSB object to use for execution. If not provided one will be lifted.
:param skip_stmts: The number of statements to skip in processing
:param last_stmt: Do not execute any statements after this statement
:param whitelist: Only execute statements in this set
:param inline: This is an inline execution. Do not bother copying the state.
:param force_addr: Force execution to pretend that we're working at this concrete address
:param thumb: Whether the block should be lifted in ARM's THUMB mode.
:param opt_level: The VEX optimization level to use.
:param insn_bytes: A string of bytes to use for the block instead of the project.
:param size: The maximum size of the block, in bytes.
:param num_inst: The maximum number of instructions.
:param traceflags: traceflags to be passed to VEX. (default: 0)
:returns: A SimSuccessors object categorizing the block's successors
"""
if 'insn_text' in kwargs:
if insn_bytes is not None:
raise SimEngineError("You cannot provide both 'insn_bytes' and 'insn_text'!")
insn_bytes = \
self.project.arch.asm(kwargs['insn_text'], addr=kwargs.get('addr', 0),
thumb=kwargs.get('thumb', False), as_bytes=True)
if insn_bytes is None:
raise AngrAssemblyError("Assembling failed. Please make sure keystone is installed, and the assembly"
" string is correct.")
return super(SimEngineTCG, self).process(state, irsb,
skip_stmts=skip_stmts,
last_stmt=last_stmt,
whitelist=whitelist,
inline=inline,
force_addr=force_addr,
insn_bytes=insn_bytes,
size=size,
num_inst=num_inst,
traceflags=traceflags,
thumb=thumb,
opt_level=opt_level)
def _check(self, state, *args, **kwargs):
return True
def _process(self, state, successors, irsb=None, skip_stmts=0, last_stmt=99999999, whitelist=None, insn_bytes=None, size=None, num_inst=None, traceflags=0, thumb=False, opt_level=None):
successors.sort = 'IRSB'
successors.description = 'IRSB'
state.history.recent_block_count = 1
state.scratch.guard = claripy.true
state.scratch.sim_procedure = None
addr = successors.addr
state._inspect('irsb', BP_BEFORE, address=addr)
while True:
if irsb is None:
irsb = self.lift(
addr=addr,
state=state,
insn_bytes=insn_bytes,
size=size,
num_inst=num_inst,
traceflags=traceflags,
thumb=thumb,
opt_level=opt_level)
# if irsb.size == 0:
# if irsb.jumpkind == 'Ijk_NoDecode' and not state.project.is_hooked(irsb.addr):
# raise SimIRSBNoDecodeError("IR decoding error at %#x. You can hook this instruction with "
# "a python replacement using project.hook"
# "(%#x, your_function, length=length_of_instruction)." % (addr, addr))
# raise SimIRSBError("Empty IRSB passed to SimIRSB.")
# check permissions, are we allowed to execute here? do we care?
if o.STRICT_PAGE_ACCESS in state.options:
try:
perms = state.memory.permissions(addr)
except simmemoryerror:
raise simsegfaulterror(addr, 'exec-miss')
else:
if not perms.symbolic:
perms = state.se.eval(perms)
if not perms & 4 and o.ENABLE_NX in state.options:
raise simsegfaulterror(addr, 'non-executable')
#state.scratch.tyenv = irsb.tyenv
state.scratch.irsb = irsb
try:
self._handle_irsb(state, successors, irsb, skip_stmts, last_stmt, whitelist)
except SimReliftException as e:
state = e.state
if insn_bytes is not None:
raise SimEngineError("You cannot pass self-modifying code as insn_bytes!!!")
new_ip = state.scratch.ins_addr
if size is not None:
size -= new_ip - addr
if num_inst is not None:
num_inst -= state.scratch.num_insns
addr = new_ip
# clear the stage before creating the new IRSB
state.scratch.dirty_addrs.clear()
irsb = None
except SimError as ex:
ex.record_state(state)
raise
else:
break
state._inspect('irsb', BP_AFTER, address=addr)
successors.processed = True
def _handle_irsb(self, state, successors, irsb, skip_stmts, last_stmt, whitelist):
# shortcut. we'll be typing this a lot
ss = irsb.statements
num_stmts = len(ss)
# fill in artifacts
successors.artifacts['irsb'] = irsb
successors.artifacts['irsb_size'] = irsb.size
successors.artifacts['irsb_statements'] = irsb.statements
insn_addrs = [ ]
# if we've told the block to truncate before it ends, it will definitely have a default
# exit barring errors
has_default_exit = num_stmts <= last_stmt
# This option makes us only execute the last four instructions
if o.SUPER_FASTPATH in state.options:
imark_counter = 0
for i in xrange(len(ss) - 1, -1, -1):
if type(ss[i]) is pytcg.TcgInstructionBoundary:
imark_counter += 1
if imark_counter >= 4:
skip_stmts = max(skip_stmts, i)
break
# set the current basic block address that's being processed
state.scratch.bbl_addr = irsb.addr
for stmt_idx, stmt in enumerate(ss):
if isinstance(stmt, pytcg.TcgInstructionBoundary):
insn_addrs.append(stmt.addr)
if stmt_idx < skip_stmts:
l.debug("Skipping statement %d", stmt_idx)
continue
if last_stmt is not None and stmt_idx > last_stmt:
l.debug("Truncating statement %d", stmt_idx)
continue
if whitelist is not None and stmt_idx not in whitelist:
l.debug("Blacklisting statement %d", stmt_idx)
continue
try:
state.scratch.stmt_idx = stmt_idx
state._inspect('statement', BP_BEFORE, statement=stmt_idx)
self._handle_statement(state, successors, stmt)
state._inspect('statement', BP_AFTER)
except UnsupportedDirtyError:
if o.BYPASS_UNSUPPORTED_IRDIRTY not in state.options:
raise
if stmt.tmp not in (0xffffffff, -1):
retval_size = state.scratch.tyenv.sizeof(stmt.tmp)
retval = state.se.Unconstrained("unsupported_dirty_%s" % stmt.cee.name, retval_size, key=('dirty', stmt.cee.name))
state.scratch.store_tmp(stmt.tmp, retval, None, None)
state.history.add_event('resilience', resilience_type='dirty', dirty=stmt.cee.name,
message='unsupported Dirty call')
except (SimSolverError, SimMemoryAddressError):
l.warning("%#x hit an error while analyzing statement %d", successors.addr, stmt_idx, exc_info=True)
has_default_exit = False
break
state.scratch.stmt_idx = num_stmts
successors.artifacts['insn_addrs'] = insn_addrs
# If there was an error, and not all the statements were processed,
# then this block does not have a default exit. This can happen if
# the block has an unavoidable "conditional" exit or if there's a legitimate
# error in the simulation
if has_default_exit:
l.debug("%s adding default exit.", self)
try:
next_expr = translate_expr(irsb.next, state)
state.history.extend_actions(next_expr.actions)
if o.TRACK_JMP_ACTIONS in state.options:
target_ao = SimActionObject(
next_expr.expr,
reg_deps=next_expr.reg_deps(), tmp_deps=next_expr.tmp_deps()
)
state.history.add_action(SimActionExit(state, target_ao, exit_type=SimActionExit.DEFAULT))
successors.add_successor(state, next_expr.expr, state.scratch.guard, irsb.jumpkind,
exit_stmt_idx='default', exit_ins_addr=state.scratch.ins_addr)
except KeyError:
# For some reason, the temporary variable that the successor relies on does not exist.
# It can be intentional (e.g. when executing a program slice)
# We save the current state anyways
successors.unsat_successors.append(state)
l.debug("The temporary variable for default exit of %s is missing.", self)
else:
l.debug("%s has no default exit", self)
# do return emulation and calless stuff
for exit_state in list(successors.all_successors):
exit_jumpkind = exit_state.history.jumpkind
if exit_jumpkind is None: exit_jumpkind = ""
if o.CALLLESS in state.options and exit_jumpkind == "Ijk_Call":
exit_state.registers.store(
exit_state.arch.ret_offset,
exit_state.se.Unconstrained('fake_ret_value', exit_state.arch.bits)
)
exit_state.scratch.target = exit_state.se.BVV(
successors.addr + irsb.size, exit_state.arch.bits
)
exit_state.history.jumpkind = "Ijk_Ret"
exit_state.regs.ip = exit_state.scratch.target
elif o.DO_RET_EMULATION in exit_state.options and \
(exit_jumpkind == "Ijk_Call" or exit_jumpkind.startswith('Ijk_Sys')):
l.debug("%s adding postcall exit.", self)
ret_state = exit_state.copy()
guard = ret_state.se.true if o.TRUE_RET_EMULATION_GUARD in state.options else ret_state.se.false
target = ret_state.se.BVV(successors.addr + irsb.size, ret_state.arch.bits)
if ret_state.arch.call_pushes_ret and not exit_jumpkind.startswith('Ijk_Sys'):
ret_state.regs.sp = ret_state.regs.sp + ret_state.arch.bytes
successors.add_successor(
ret_state, target, guard, 'Ijk_FakeRet', exit_stmt_idx='default',
exit_ins_addr=state.scratch.ins_addr
)
if whitelist and successors.is_empty:
# If statements of this block are white-listed and none of the exit statement (not even the default exit) is
# in the white-list, successors will be empty, and there is no way for us to get the final state.
# To this end, a final state is manually created
l.debug('Add an incomplete successor state as the result of an incomplete execution due to the white-list.')
successors.flat_successors.append(state)
def _handle_statement(self, state, successors, stmt):
"""
This function receives an initial state and imark and processes a list of pyvex.IRStmts
It annotates the request with a final state, last imark, and a list of SimIRStmts
"""
if type(stmt) == pytcg.TcgInstructionBoundary:
ins_addr = stmt.addr
state.scratch.ins_addr = ins_addr
# Raise an exception if we're suddenly in self-modifying code
for subaddr in xrange(stmt.len):
if subaddr + stmt.addr in state.scratch.dirty_addrs:
raise SimReliftException(state)
state._inspect('instruction', BP_AFTER)
l.debug("IMark: %#x", stmt.addr)
state.scratch.num_insns += 1
state._inspect('instruction', BP_BEFORE, instruction=ins_addr)
# process it!
s_stmt = translate_stmt(stmt, state)
if s_stmt is not None:
state.history.extend_actions(s_stmt.actions)
# for the exits, put *not* taking the exit on the list of constraints so
# that we can continue on. Otherwise, add the constraints
if type(stmt) == pyvex.IRStmt.Exit:
l.debug("%s adding conditional exit", self)
# Produce our successor state!
# Let SimSuccessors.add_successor handle the nitty gritty details
exit_state = state.copy()
successors.add_successor(exit_state, s_stmt.target, s_stmt.guard, s_stmt.jumpkind,
exit_stmt_idx=state.scratch.stmt_idx, exit_ins_addr=state.scratch.ins_addr)
# Do our bookkeeping on the continuing state
cont_condition = claripy.Not(s_stmt.guard)
state.add_constraints(cont_condition)
state.scratch.guard = claripy.And(state.scratch.guard, cont_condition)
def lift(self,
state=None,
clemory=None,
insn_bytes=None,
arch=None,
addr=None,
size=None,
num_inst=None,
traceflags=0,
thumb=False,
opt_level=None):
"""
Lift an IRSB.
There are many possible valid sets of parameters. You at the very least must pass some
source of data, some source of an architecture, and some source of an address.
Sources of data in order of priority: insn_bytes, clemory, state
Sources of an address, in order of priority: addr, state
Sources of an architecture, in order of priority: arch, clemory, state
:param state: A state to use as a data source.
:param clemory: A cle.memory.Clemory object to use as a data source.
:param addr: The address at which to start the block.
:param thumb: Whether the block should be lifted in ARM's THUMB mode.
:param opt_level: The VEX optimization level to use. The final IR optimization level is determined by
(ordered by priority):
- Argument opt_level
- opt_level is set to 1 if OPTIMIZE_IR exists in state options
- self._default_opt_level
:param insn_bytes: A string of bytes to use as a data source.
:param size: The maximum size of the block, in bytes.
:param num_inst: The maximum number of instructions.
:param traceflags: traceflags to be passed to VEX. (default: 0)
"""
# phase 0: sanity check
if not state and not clemory and not insn_bytes:
raise ValueError("Must provide state or clemory or insn_bytes!")
if not state and not clemory and not arch:
raise ValueError("Must provide state or clemory or arch!")
if addr is None and not state:
raise ValueError("Must provide state or addr!")
if arch is None:
arch = clemory._arch if clemory else state.arch
if arch.name.startswith("MIPS") and self._single_step:
l.error("Cannot specify single-stepping on MIPS.")
self._single_step = False
# phase 1: parameter defaults
if addr is None:
addr = state.se.eval(state._ip)
if size is not None:
size = min(size, VEX_IRSB_MAX_SIZE)
if size is None:
size = VEX_IRSB_MAX_SIZE
if num_inst is not None:
num_inst = min(num_inst, VEX_IRSB_MAX_INST)
if num_inst is None and self._single_step:
num_inst = 1
if opt_level is None:
if state and o.OPTIMIZE_IR in state.options:
opt_level = 1
else:
opt_level = self._default_opt_level
if self._support_selfmodifying_code:
if opt_level > 0:
l.warning("Self-modifying code is not always correctly optimized by PyVEX. To guarantee correctness, VEX optimizations have been disabled.")
opt_level = 0
if state and o.OPTIMIZE_IR in state.options:
state.options.remove(o.OPTIMIZE_IR)
# phase 2: thumb normalization
thumb = int(thumb)
if isinstance(arch, ArchARM):
if addr % 2 == 1:
thumb = 1
if thumb:
addr &= ~1
elif thumb:
l.error("thumb=True passed on non-arm architecture!")
thumb = 0
# phase 3: check cache
cache_key = (addr, insn_bytes, size, num_inst, thumb, opt_level)
if self._use_cache and cache_key in self._block_cache:
self._block_cache_hits += 1
irsb = self._block_cache[cache_key]
stop_point = self._first_stoppoint(irsb)
if stop_point is None:
return irsb
else:
size = stop_point - addr
# check the cache again
cache_key = (addr, insn_bytes, size, num_inst, thumb, opt_level)
if cache_key in self._block_cache:
self._block_cache_hits += 1
return self._block_cache[cache_key]
else:
self._block_cache_misses += 1
else:
# a special case: `size` is used as the maximum allowed size
tmp_cache_key = (addr, insn_bytes, VEX_IRSB_MAX_SIZE, num_inst, thumb, opt_level)
try:
irsb = self._block_cache[tmp_cache_key]
if irsb.size <= size:
self._block_cache_hits += 1
return self._block_cache[tmp_cache_key]
except KeyError:
self._block_cache_misses += 1
# phase 4: get bytes
if insn_bytes is not None:
buff, size = insn_bytes, len(insn_bytes)
else:
buff, size = self._load_bytes(addr, size, state, clemory)
if not buff or size == 0:
raise SimEngineError("No bytes in memory for block starting at %#x." % addr)
# phase 5: call into pytcg
l.debug("Creating pyvex.IRSB of arch %s at %#x", arch.name, addr)
try:
for subphase in xrange(2):
irsb = pytcg.IRSB(buff, addr + thumb, arch,
num_bytes=size,
num_inst=num_inst,
bytes_offset=thumb,
traceflags=traceflags,
opt_level=opt_level)
# FIXME: What is this
# if subphase == 0:
# # check for possible stop points
# stop_point = self._first_stoppoint(irsb)
# if stop_point is not None:
# size = stop_point - addr
# continue
if self._use_cache:
self._block_cache[cache_key] = irsb
return irsb
# FIXME: Handle errors
except Exception as e:
raise
# phase x: error handling
# except pyvex.PyVEXError:
# l.debug("VEX translation error at %#x", addr)
# if isinstance(buff, str):
# l.debug('Using bytes: %r', buff)
# else:
# l.debug("Using bytes: %r", pyvex.ffi.buffer(buff, size))
# e_type, value, traceback = sys.exc_info()
# raise SimTranslationError, ("Translation error", e_type, value), traceback
def _load_bytes(self, addr, max_size, state=None, clemory=None):
if not clemory:
if state is None:
raise SimEngineError('state and clemory cannot both be None in _load_bytes().')
if o.ABSTRACT_MEMORY in state.options:
# abstract memory
clemory = state.memory.regions['global'].memory.mem._memory_backer
else:
# symbolic memory
clemory = state.memory.mem._memory_backer
buff, size = "", 0
# Load from the clemory if we can
smc = self._support_selfmodifying_code
if state:
try:
p = state.memory.permissions(addr)
if p.symbolic:
smc = True
else:
smc = claripy.is_true(p & 2 != 0)
except: # pylint: disable=bare-except
smc = True # I don't know why this would ever happen, we checked this right?
if not smc or not state:
try:
buff, size = clemory.read_bytes_c(addr)
except KeyError:
pass
# If that didn't work, try to load from the state
if size == 0 and state:
if addr in state.memory and addr + max_size - 1 in state.memory:
buff = state.se.eval(state.memory.load(addr, max_size, inspect=False), cast_to=str)
size = max_size
else:
good_addrs = []
for i in xrange(max_size):
if addr + i in state.memory:
good_addrs.append(addr + i)
else:
break
buff = ''.join(chr(state.se.eval(state.memory.load(i, 1, inspect=False))) for i in good_addrs)
size = len(buff)
size = min(max_size, size)
return buff, size
def _first_stoppoint(self, irsb):
"""
Enumerate the imarks in the block. If any of them (after the first one) are at a stop point, returns the address
of the stop point. None is returned otherwise.
"""
if self._stop_points is None and self.project is None:
return None
first_imark = True
for stmt in irsb.statements:
if type(stmt) is pytcg.TcgInstructionBoundary: # pylint: disable=unidiomatic-typecheck
addr = stmt.addr
if not first_imark and self.is_stop_point(addr):
# could this part be moved by pyvex?
return addr
first_imark = False
return None
def clear_cache(self):
self._block_cache = LRUCache(maxsize=self._cache_size)
self._block_cache_hits = 0
self._block_cache_misses = 0
#
# Pickling
#
def __setstate__(self, state):
self.project = state['project']
self._stop_points = state['_stop_points']
self._use_cache = state['_use_cache']
self._default_opt_level = state['_default_opt_level']
self._support_selfmodifying_code = state['_support_selfmodifying_code']
self._single_step = state['_single_step']
self._cache_size = state['_cache_size']
# rebuild block cache
self._initialize_block_cache()
def __getstate__(self):
s = {}
s['project'] = self.project
s['_stop_points'] = self._stop_points
s['_use_cache'] = self._use_cache
s['_default_opt_level'] = self._default_opt_level
s['_support_selfmodifying_code'] = self._support_selfmodifying_code
s['_single_step'] = self._single_step
s['_cache_size'] = self._cache_size
return s