Skip to content

Commit

Permalink
Merge pull request #301 from BenjaminSoelberg/master
Browse files Browse the repository at this point in the history
Added an option to set the maximum number of times emulation can revisits addresses.
  • Loading branch information
williballenthin committed Jan 9, 2019
2 parents d27b1c5 + b9168a5 commit 0edd694
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 15 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Expand Up @@ -19,3 +19,7 @@ lib/

# Test executables
bin/

# PyCharm
.idea
venv
12 changes: 6 additions & 6 deletions floss/function_argument_getter.py
Expand Up @@ -51,12 +51,12 @@ def __init__(self, vivisect_workspace):
self.driver = viv_utils.emulator_drivers.FunctionRunnerEmulatorDriver(self.emu)
self.index = viv_utils.InstructionFunctionIndex(vivisect_workspace)

def get_all_function_contexts(self, function_va):
def get_all_function_contexts(self, function_va, max_hits):
self.d("Getting function context for function at 0x%08X...", function_va)

all_contexts = []
for caller_va in self.get_caller_vas(function_va):
function_context = self.get_contexts_via_monitor(caller_va, function_va)
function_context = self.get_contexts_via_monitor(caller_va, function_va, max_hits)
all_contexts.extend(function_context)

self.d("Got %d function contexts for function at 0x%08X.", len(all_contexts), function_va)
Expand Down Expand Up @@ -92,7 +92,7 @@ def get_caller_vas(self, function_va):
caller_function_vas.add(caller_function_va)
return caller_function_vas

def get_contexts_via_monitor(self, fva, target_fva):
def get_contexts_via_monitor(self, fva, target_fva, max_hits):
"""
run the given function while collecting arguments to a target function
"""
Expand All @@ -106,7 +106,7 @@ def get_contexts_via_monitor(self, fva, target_fva):
monitor = CallMonitor(self.vivisect_workspace, target_fva)
with installed_monitor(self.driver, monitor):
with api_hooks.defaultHooks(self.driver):
self.driver.runFunction(self.index[fva], maxhit=1, maxrep=0x1000, func_only=True)
self.driver.runFunction(self.index[fva], maxhit=max_hits, maxrep=0x1000, func_only=True)
contexts = monitor.get_contexts()

self.d(" results:")
Expand All @@ -116,5 +116,5 @@ def get_contexts_via_monitor(self, fva, target_fva):
return contexts


def get_function_contexts(vw, fva):
return FunctionArgumentGetter(vw).get_all_function_contexts(fva)
def get_function_contexts(vw, fva, max_hits):
return FunctionArgumentGetter(vw).get_all_function_contexts(fva, max_hits)
26 changes: 19 additions & 7 deletions floss/main.py
Expand Up @@ -55,21 +55,22 @@ def hex(i):
return "0x%X" % (i)


def decode_strings(vw, decoding_functions_candidates, min_length, no_filter=False, max_instruction_count=20000):
def decode_strings(vw, decoding_functions_candidates, min_length, no_filter=False, max_instruction_count=20000, max_hits=1):
"""
FLOSS string decoding algorithm
:param vw: vivisect workspace
:param decoding_functions_candidates: identification manager
:param min_length: minimum string length
:param no_filter: do not filter decoded strings
:param max_instruction_count: The maximum number of instructions to emulate per function.
:param max_hits: The maximum number of hits per address
:return: list of decoded strings ([DecodedString])
"""
decoded_strings = []
function_index = viv_utils.InstructionFunctionIndex(vw)
# TODO pass function list instead of identification manager
for fva, _ in decoding_functions_candidates.get_top_candidate_functions(10):
for ctx in string_decoder.extract_decoding_contexts(vw, fva):
for ctx in string_decoder.extract_decoding_contexts(vw, fva, max_hits):
for delta in string_decoder.emulate_decoding_routine(vw, function_index, fva, ctx, max_instruction_count):
for delta_bytes in string_decoder.extract_delta_bytes(delta, ctx.decoded_at_va, fva):
for decoded_string in string_decoder.extract_strings(delta_bytes, min_length, no_filter):
Expand Down Expand Up @@ -146,7 +147,9 @@ def make_parser():
help="do not filter deobfuscated strings (may result in many false positive strings)",
action="store_true")
parser.add_option("--max-instruction-count", dest="max_instruction_count", type=int, default=20000,
help="maximum number of instructions to emulate per function")
help="maximum number of instructions to emulate per function (default is 20000)")
parser.add_option("--max-address-revisits", dest="max_address_revisits", type=int, default=0,
help="maximum number of address revisits per function (default is 0)")

shellcode_group = OptionGroup(parser, "Shellcode options", "Analyze raw binary file containing shellcode")
shellcode_group.add_option("-s", "--shellcode", dest="is_shellcode", help="analyze shellcode",
Expand Down Expand Up @@ -420,7 +423,7 @@ def filter_unique_decoded(decoded_strings):
unique_values = set()
originals = []
for decoded in decoded_strings:
hashable = (decoded.va, decoded.s, decoded.decoded_at_va, decoded.fva)
hashable = (decoded.s, decoded.decoded_at_va, decoded.fva)
if hashable not in unique_values:
unique_values.add(hashable)
originals.append(decoded)
Expand Down Expand Up @@ -486,10 +489,10 @@ def print_decoding_results(decoded_strings, group_functions, quiet=False, expert
:param quiet: print strings only, suppresses headers
:param expert: expert mode
"""
if not quiet:
print("\nFLOSS decoded %d strings" % len(decoded_strings))

if group_functions:
if not quiet:
print("\nFLOSS decoded %d strings" % len(decoded_strings))
fvas = set(map(lambda i: i.fva, decoded_strings))
for fva in fvas:
grouped_strings = filter(lambda ds: ds.fva == fva, decoded_strings)
Expand All @@ -499,6 +502,12 @@ def print_decoding_results(decoded_strings, group_functions, quiet=False, expert
print("\nDecoding function at 0x%X (decoded %d strings)" % (fva, len_ds))
print_decoded_strings(grouped_strings, quiet=quiet, expert=expert)
else:
if not expert:
seen = set()
decoded_strings = [x for x in decoded_strings if not (x.s in seen or seen.add(x.s))]
if not quiet:
print("\nFLOSS decoded %d strings" % len(decoded_strings))

print_decoded_strings(decoded_strings, quiet=quiet, expert=expert)


Expand Down Expand Up @@ -941,7 +950,10 @@ def main(argv=None):
print_identification_results(sample_file_path, decoding_functions_candidates)

floss_logger.info("Decoding strings...")
decoded_strings = decode_strings(vw, decoding_functions_candidates, min_length, options.no_filter, options.max_instruction_count)
decoded_strings = decode_strings(vw, decoding_functions_candidates, min_length, options.no_filter,
options.max_instruction_count, options.max_address_revisits + 1)
# TODO: The de-duplication process isn't perfect as it is done here and in print_decoding_results and
# TODO: all of them on non-sanitized strings.
if not options.expert:
decoded_strings = filter_unique_decoded(decoded_strings)
print_decoding_results(decoded_strings, options.group_functions, quiet=options.quiet, expert=options.expert)
Expand Down
5 changes: 3 additions & 2 deletions floss/string_decoder.py
Expand Up @@ -91,7 +91,7 @@ def memdiff(bytes1, bytes2):
return diffs


def extract_decoding_contexts(vw, function):
def extract_decoding_contexts(vw, function, max_hits):
'''
Extract the CPU and memory contexts of all calls to the given function.
Under the hood, we brute-force emulate all code paths to extract the
Expand All @@ -101,9 +101,10 @@ def extract_decoding_contexts(vw, function):
:param vw: The vivisect workspace in which the function is defined.
:type function: int
:param function: The address of the function whose contexts we'll find.
:param max_hits: The maximum number of hits per address
:rtype: Sequence[function_argument_getter.FunctionContext]
'''
return get_function_contexts(vw, function)
return get_function_contexts(vw, function, max_hits)


def emulate_decoding_routine(vw, function_index, function, context, max_instruction_count):
Expand Down

0 comments on commit 0edd694

Please sign in to comment.