Skip to content
Branch: master
Find file Copy path
Find file Copy path
5 contributors

Users who have contributed to this file

@thrsten @Phat3 @antoniobianchi333 @ltfish @conand
408 lines (340 sloc) 16.5 KB
import logging
from archinfo.arch_soot import (ArchSoot, SootAddressDescriptor,
SootAddressTerminator, SootArgument,
from ... import sim_options as o
from ...errors import SimEngineError, SimTranslationError
from cle import CLEError
from ...state_plugins.inspect import BP_AFTER, BP_BEFORE
from ..engine import SimEngine
from .exceptions import BlockTerminationNotice, IncorrectLocationException
from .statements import (SimSootStmt_Return, SimSootStmt_ReturnVoid,
from .values import SimSootValue_Local, SimSootValue_ParamRef
l = logging.getLogger('angr.engines.soot.engine')
# pylint: disable=arguments-differ
class SimEngineSoot(SimEngine):
Execution engine based on Soot.
def __init__(self, project=None, **kwargs):
super(SimEngineSoot, self).__init__(**kwargs)
self.project = project
def lift(self, addr=None, the_binary=None, **kwargs): # pylint: disable=unused-argument, no-self-use
assert isinstance(addr, SootAddressDescriptor)
method, stmt_idx = addr.method, addr.stmt_idx
method = the_binary.get_soot_method(method, params=method.params)
except CLEError as ex:
raise SimTranslationError("CLE error: {}".format(ex))
if stmt_idx is None:
return method.blocks[0] if method.blocks else None
# _, block = method.block_by_label.floor_item(stmt_idx)
#except KeyError:
# return None
#return block
# TODO: Re-enable the above code once bintrees are used
# FIXME: stmt_idx does not index from the start of the method but from the start
# of the block therefore it always returns the block with label 0 independently
# of where we are
# block = method.block_by_label.get(stmt_idx, None)
# if block is not None:
# return block
# Slow path
for block_idx, block in enumerate(method.blocks):
# if block.label <= stmt_idx < block.label + len(block.statements):
if block_idx == addr.block_idx:
return block
return None
def _check(self, state, *args, **kwargs):
return isinstance(state._ip, SootAddressDescriptor)
def _process(self, state, successors, *args, **kwargs):
addr = state._ip
if isinstance(addr, SootAddressTerminator):
successors.processed = True
if self.project.use_sim_procedures:
procedure = self._get_sim_procedure(addr)
if procedure is not None:
self.project.factory.procedure_engine._process(state, successors, procedure)
binary = state.regs._ip_binary
method = binary.get_soot_method(addr.method, none_if_missing=True)
# TODO make the skipping of code in "android.*" classes optional
if addr.method.class_name.startswith('android.') or not method:
# This means we are executing code that is not in CLE, typically library code.
# We may want soot -> pysoot -> cle to export at least the method names of the libraries
# (soot has a way to deal with this), as of now we just "simulate" a return.
# Note: If we have a sim procedure, we should not reach this point.
l.warning("Try to execute non-loaded code %s. Execute unconstrained SimProcedure.", addr)
# STEP 1: Get unconstrained SimProcedure
procedure = self.get_unconstrained_simprocedure()
# STEP 2: Pass Method descriptor as Parameter
# check if there are already params in the stack
param_idx = 0
param_ref = state.javavm_memory.load(SimSootValue_ParamRef(param_idx, None), none_if_missing=True)
while param_ref is not None:
param_idx += 1
param_ref = state.javavm_memory.load(SimSootValue_ParamRef(param_idx, None), none_if_missing=True)
# store all function arguments in memory, starting from the last param index, None), addr.method)
# STEP 4: Execute unconstrained procedure
self.project.factory.procedure_engine._process(state, successors, procedure)
# self._add_return_exit(state, successors)
block = method.blocks[addr.block_idx]
starting_stmt_idx = addr.stmt_idx
if starting_stmt_idx == 0:
l.debug("Executing new block %s \n\n%s\n", addr, block)
# l.debug("Continue executing block %s", addr)
l.debug("Continue executing block %s \n\n%s\n", addr, block)
self._handle_block(state, successors, block, starting_stmt_idx, method)
successors.processed = True
def _handle_block(self, state, successors, block, starting_stmt_idx, method=None):
for tindex, stmt in enumerate(block.statements[starting_stmt_idx:]):
stmt_idx = starting_stmt_idx + tindex
state._inspect('statement', BP_BEFORE, statement=stmt_idx)
terminate = self._handle_statement(state, successors, stmt_idx, stmt)
state._inspect('statement', BP_AFTER)
if terminate:
if stmt is None:
l.warning("Executed empty bb, maybe pc is broken")
if method is not None:
next_addr = self._get_next_linear_instruction(state, stmt_idx)
l.debug("Advancing execution linearly to %s", next_addr)
if next_addr is not None:
successors.add_successor(state.copy(), next_addr, state.solver.true, 'Ijk_Boring')
def _handle_statement(self, state, successors, stmt_idx, stmt):
# execute statement
l.debug("Executing statement: %s", stmt)
s_stmt = translate_stmt(stmt, state)
except SimEngineError as e:
l.error("Skipping statement: %s", e)
return False
# add invoke exit
if s_stmt.has_invoke_target:
invoke_state = state.copy()
# parse invoke expression
invoke_expr = s_stmt.invoke_expr
method = invoke_expr.method
args = invoke_expr.args
ret_var = invoke_expr.ret_var if hasattr(invoke_expr, 'ret_var') else None
# setup callsite
ret_addr = self._get_next_linear_instruction(state, stmt_idx)
if 'NATIVE' in method.attrs:
# the target of the call is a native function
# => we need to setup a native call-site
l.debug("Native invoke: %r", method)
addr = self.project.simos.get_addr_of_native_method(method)
if not addr:
# native function could not be found
# => skip invocation and continue execution linearly
return False
invoke_state = self._setup_native_callsite(invoke_state, addr, method, args, ret_addr, ret_var)
l.debug("Invoke: %r", method)
self.setup_callsite(invoke_state, args, ret_addr, ret_var)
addr = SootAddressDescriptor(method, 0, 0)
# add invoke state as the successor and terminate execution
# prematurely, since Soot does not guarantee that an invoke stmt
# terminates a block
successors.add_successor(invoke_state, addr, state.solver.true, 'Ijk_Call')
return True
# add jmp exit
elif s_stmt.has_jump_targets:
for target, condition in s_stmt.jmp_targets_with_conditions:
if not target:
target = self._get_next_linear_instruction(state, stmt_idx)
l.debug("Possible jump: %s -> %s", state._ip, target)
successors.add_successor(state.copy(), target, condition, 'Ijk_Boring')
return True
# add return exit
elif isinstance(s_stmt, (SimSootStmt_Return, SimSootStmt_ReturnVoid)):
l.debug("Return exit")
self._add_return_exit(state, successors, s_stmt.return_value)
return True
# go on linearly
return False
def _add_return_exit(cls, state, successors, return_val=None):
ret_state = state.copy()
cls.prepare_return_state(ret_state, return_val)
successors.add_successor(ret_state, state.callstack.ret_addr, ret_state.solver.true, 'Ijk_Ret')
successors.processed = True
def _get_sim_procedure(self, addr):
# Delayed import
from ...procedures import SIM_PROCEDURES
if addr in self.project._sim_procedures:
return self.project._sim_procedures[addr]
method = addr.method
class_name = method.class_name
method_prototype = "%s(%s)" % (, ",".join(method.params))
if class_name in SIM_PROCEDURES and \
method_prototype in SIM_PROCEDURES[class_name]:
procedure_cls = SIM_PROCEDURES[class_name][method_prototype]
return None
# Lazy-initialize it
proc = procedure_cls(project=self.project)
self.project._sim_procedures[addr] = proc
return proc
def get_unconstrained_simprocedure(self):
# Delayed import
from ...procedures import SIM_PROCEDURES
# TODO: fix method prototype
procedure_cls = SIM_PROCEDURES["angr.unconstrained"]["unconstrained()"]
# Lazy-initialize it
proc = procedure_cls(project=self.project)
return proc
def _is_method_beginning(addr):
return addr.block_idx == 0 and addr.stmt_idx == 0
def _get_next_linear_instruction(state, stmt_idx):
addr = state.addr.copy()
addr.stmt_idx = stmt_idx
method = state.regs._ip_binary.get_soot_method(addr.method)
current_bb = method.blocks[addr.block_idx]
new_stmt_idx = addr.stmt_idx + 1
if new_stmt_idx < len(current_bb.statements):
return SootAddressDescriptor(addr.method, addr.block_idx, new_stmt_idx)
new_bb_idx = addr.block_idx + 1
if new_bb_idx < len(method.blocks):
return SootAddressDescriptor(addr.method, new_bb_idx, 0)
l.warning("falling into a non existing bb: %d in %s",
new_bb_idx, SootMethodDescriptor.from_soot_method(method))
raise IncorrectLocationException()
def setup_callsite(cls, state, args, ret_addr, ret_var=None):
# push new callstack frame
state.callstack.ret_addr = ret_addr
state.callstack.invoke_return_variable = ret_var
# push new memory stack frame
# setup arguments
if args:
cls.setup_arguments(state, list(args))
def setup_arguments(state, args):
# if available, store the 'this' reference
if len(args) > 0 and args[0].is_this_ref:
this_ref = args.pop(0)
local = SimSootValue_Local('this', this_ref.type), this_ref.value)
# store all function arguments in memory
for idx, arg in enumerate(args):
param_ref = SimSootValue_ParamRef(idx, arg.type), arg.value)
def prepare_return_state(state, ret_value=None):
# pop callstack
ret_var = state.callstack.invoke_return_variable
procedure_data = state.callstack.procedure_data
# pass procedure data to the current callstack (if available)
# => this should get removed by the corresponding sim procedure
state.callstack.procedure_data = procedure_data
# pop memory frame
# save return value
if ret_value is not None:
l.debug("Assign %r := %r", ret_var, ret_value)
if ret_var is not None:
# usually the return value is read from the previous stack frame, ret_value)
# however if we call a method from outside (e.g. project.state_call),
# no previous stack frame exist and the return variable is not set
# => for this cases, we store the value in the registers, so it can
# still be accessed
state.regs.invoke_return_value = ret_value
def terminate_execution(statement, state, successors):
l.debug("Returning with an empty stack: ending execution")
# this code is coming by sim_procedure.exit()
exit_code = 0
if type(statement) is SimSootStmt_Return:
exit_code = statement.return_value
# TODO symbolic exit code?
exit_code = state.solver.BVV(exit_code, state.arch.bits)
state.history.add_event('terminate', exit_code=exit_code)
successors.add_successor(state, state.regs.ip, state.solver.true, 'Ijk_Exit')
successors.processed = True
raise BlockTerminationNotice()
# JNI Native Interface
def prepare_native_return_state(native_state):
Hook target for native function call returns.
Recovers and stores the return value from native memory and toggles the
state, s.t. execution continues in the Soot engine.
javavm_simos = native_state.project.simos
ret_state = native_state.copy()
# set successor flags
ret_state.regs._ip = ret_state.callstack.ret_addr
ret_state.scratch.guard = ret_state.solver.true
ret_state.history.jumpkind = 'Ijk_Ret'
# if available, lookup the return value in native memory
ret_var = ret_state.callstack.invoke_return_variable
if ret_var is not None:
# get return symbol from native state
native_cc = javavm_simos.get_native_cc()
ret_symbol = native_cc.get_return_val(native_state).to_claripy()
# convert value to java type
if ret_var.type in ArchSoot.primitive_types:
# return value has a primitive type
# => we need to manually cast the return value to the correct size, as this
# would be usually done by the java callee
ret_value = javavm_simos.cast_primitive(ret_state, ret_symbol,
# return value has a reference type
# => ret_symbol is a opaque ref
# => lookup corresponding java reference
ret_value = ret_state.jni_references.lookup(ret_symbol)
ret_value = None
# teardown return state
SimEngineSoot.prepare_return_state(ret_state, ret_value)
# finally, delete all local references
return [ret_state]
def _setup_native_callsite(cls, state, native_addr, java_method, args, ret_addr, ret_var):
# Step 1: setup java callsite, but w/o storing arguments in memory
cls.setup_callsite(state, None, ret_addr, ret_var)
# Step 2: add JNI specific arguments to *args list
# get JNI environment pointer
jni_env = SootArgument(state.project.simos.jni_env, "JNIEnv")
# get reference to the current object or class
if args and args[0].is_this_ref:
# instance method call
# => pass 'this' reference to native code
ref = args.pop(0)
# static method call
# => pass 'class' reference to native code
class_ = state.javavm_classloader.get_class(java_method.class_name, init_class=True)
ref = SootArgument(class_, "Class")
# add to args
final_args = [jni_env, ref] + args
# Step 3: create native invoke state
return state.project.simos.state_call(native_addr, *final_args,
You can’t perform that action at this time.