Skip to content
Permalink
Browse files

Added an option to set the maximum number of times emulation can

revisits addresses.

Increasing it improves string decoding within loops and
complex flows, but takes longer.

Also improved the string de-duplication process (still not perfect).
  • Loading branch information...
BenjaminSoelberg committed Jan 8, 2019
1 parent d27b1c5 commit b9168a5e13aa20b73d8efb2ff830f2a2afbe623b
Showing with 32 additions and 15 deletions.
  1. +4 −0 .gitignore
  2. +6 −6 floss/function_argument_getter.py
  3. +19 −7 floss/main.py
  4. +3 −2 floss/string_decoder.py
@@ -19,3 +19,7 @@ lib/

# Test executables
bin/

# PyCharm
.idea
venv
@@ -51,12 +51,12 @@ def __init__(self, vivisect_workspace):
self.driver = viv_utils.emulator_drivers.FunctionRunnerEmulatorDriver(self.emu)
self.index = viv_utils.InstructionFunctionIndex(vivisect_workspace)

def get_all_function_contexts(self, function_va):
def get_all_function_contexts(self, function_va, max_hits):
self.d("Getting function context for function at 0x%08X...", function_va)

all_contexts = []
for caller_va in self.get_caller_vas(function_va):
function_context = self.get_contexts_via_monitor(caller_va, function_va)
function_context = self.get_contexts_via_monitor(caller_va, function_va, max_hits)
all_contexts.extend(function_context)

self.d("Got %d function contexts for function at 0x%08X.", len(all_contexts), function_va)
@@ -92,7 +92,7 @@ def get_caller_vas(self, function_va):
caller_function_vas.add(caller_function_va)
return caller_function_vas

def get_contexts_via_monitor(self, fva, target_fva):
def get_contexts_via_monitor(self, fva, target_fva, max_hits):
"""
run the given function while collecting arguments to a target function
"""
@@ -106,7 +106,7 @@ def get_contexts_via_monitor(self, fva, target_fva):
monitor = CallMonitor(self.vivisect_workspace, target_fva)
with installed_monitor(self.driver, monitor):
with api_hooks.defaultHooks(self.driver):
self.driver.runFunction(self.index[fva], maxhit=1, maxrep=0x1000, func_only=True)
self.driver.runFunction(self.index[fva], maxhit=max_hits, maxrep=0x1000, func_only=True)
contexts = monitor.get_contexts()

self.d(" results:")
@@ -116,5 +116,5 @@ def get_contexts_via_monitor(self, fva, target_fva):
return contexts


def get_function_contexts(vw, fva):
return FunctionArgumentGetter(vw).get_all_function_contexts(fva)
def get_function_contexts(vw, fva, max_hits):
return FunctionArgumentGetter(vw).get_all_function_contexts(fva, max_hits)
@@ -55,21 +55,22 @@ def hex(i):
return "0x%X" % (i)


def decode_strings(vw, decoding_functions_candidates, min_length, no_filter=False, max_instruction_count=20000):
def decode_strings(vw, decoding_functions_candidates, min_length, no_filter=False, max_instruction_count=20000, max_hits=1):
"""
FLOSS string decoding algorithm
:param vw: vivisect workspace
:param decoding_functions_candidates: identification manager
:param min_length: minimum string length
:param no_filter: do not filter decoded strings
:param max_instruction_count: The maximum number of instructions to emulate per function.
:param max_hits: The maximum number of hits per address
:return: list of decoded strings ([DecodedString])
"""
decoded_strings = []
function_index = viv_utils.InstructionFunctionIndex(vw)
# TODO pass function list instead of identification manager
for fva, _ in decoding_functions_candidates.get_top_candidate_functions(10):
for ctx in string_decoder.extract_decoding_contexts(vw, fva):
for ctx in string_decoder.extract_decoding_contexts(vw, fva, max_hits):
for delta in string_decoder.emulate_decoding_routine(vw, function_index, fva, ctx, max_instruction_count):
for delta_bytes in string_decoder.extract_delta_bytes(delta, ctx.decoded_at_va, fva):
for decoded_string in string_decoder.extract_strings(delta_bytes, min_length, no_filter):
@@ -146,7 +147,9 @@ def make_parser():
help="do not filter deobfuscated strings (may result in many false positive strings)",
action="store_true")
parser.add_option("--max-instruction-count", dest="max_instruction_count", type=int, default=20000,
help="maximum number of instructions to emulate per function")
help="maximum number of instructions to emulate per function (default is 20000)")
parser.add_option("--max-address-revisits", dest="max_address_revisits", type=int, default=0,
help="maximum number of address revisits per function (default is 0)")

shellcode_group = OptionGroup(parser, "Shellcode options", "Analyze raw binary file containing shellcode")
shellcode_group.add_option("-s", "--shellcode", dest="is_shellcode", help="analyze shellcode",
@@ -420,7 +423,7 @@ def filter_unique_decoded(decoded_strings):
unique_values = set()
originals = []
for decoded in decoded_strings:
hashable = (decoded.va, decoded.s, decoded.decoded_at_va, decoded.fva)
hashable = (decoded.s, decoded.decoded_at_va, decoded.fva)
if hashable not in unique_values:
unique_values.add(hashable)
originals.append(decoded)
@@ -486,10 +489,10 @@ def print_decoding_results(decoded_strings, group_functions, quiet=False, expert
:param quiet: print strings only, suppresses headers
:param expert: expert mode
"""
if not quiet:
print("\nFLOSS decoded %d strings" % len(decoded_strings))

if group_functions:
if not quiet:
print("\nFLOSS decoded %d strings" % len(decoded_strings))
fvas = set(map(lambda i: i.fva, decoded_strings))
for fva in fvas:
grouped_strings = filter(lambda ds: ds.fva == fva, decoded_strings)
@@ -499,6 +502,12 @@ def print_decoding_results(decoded_strings, group_functions, quiet=False, expert
print("\nDecoding function at 0x%X (decoded %d strings)" % (fva, len_ds))
print_decoded_strings(grouped_strings, quiet=quiet, expert=expert)
else:
if not expert:
seen = set()
decoded_strings = [x for x in decoded_strings if not (x.s in seen or seen.add(x.s))]
if not quiet:
print("\nFLOSS decoded %d strings" % len(decoded_strings))

print_decoded_strings(decoded_strings, quiet=quiet, expert=expert)


@@ -941,7 +950,10 @@ def main(argv=None):
print_identification_results(sample_file_path, decoding_functions_candidates)

floss_logger.info("Decoding strings...")
decoded_strings = decode_strings(vw, decoding_functions_candidates, min_length, options.no_filter, options.max_instruction_count)
decoded_strings = decode_strings(vw, decoding_functions_candidates, min_length, options.no_filter,
options.max_instruction_count, options.max_address_revisits + 1)
# TODO: The de-duplication process isn't perfect as it is done here and in print_decoding_results and
# TODO: all of them on non-sanitized strings.
if not options.expert:
decoded_strings = filter_unique_decoded(decoded_strings)
print_decoding_results(decoded_strings, options.group_functions, quiet=options.quiet, expert=options.expert)
@@ -91,7 +91,7 @@ def memdiff(bytes1, bytes2):
return diffs


def extract_decoding_contexts(vw, function):
def extract_decoding_contexts(vw, function, max_hits):
'''
Extract the CPU and memory contexts of all calls to the given function.
Under the hood, we brute-force emulate all code paths to extract the
@@ -101,9 +101,10 @@ def extract_decoding_contexts(vw, function):
:param vw: The vivisect workspace in which the function is defined.
:type function: int
:param function: The address of the function whose contexts we'll find.
:param max_hits: The maximum number of hits per address
:rtype: Sequence[function_argument_getter.FunctionContext]
'''
return get_function_contexts(vw, function)
return get_function_contexts(vw, function, max_hits)


def emulate_decoding_routine(vw, function_index, function, context, max_instruction_count):

0 comments on commit b9168a5

Please sign in to comment.
You can’t perform that action at this time.