From 0aed13542b9a51cfa2eca15ee4b2476fda71363c Mon Sep 17 00:00:00 2001 From: Camille Mougey Date: Sun, 23 Apr 2023 15:39:29 +0200 Subject: [PATCH 1/2] Add a "generic" import recovery strategy based on ImpRec --- miasm/jitter/loader/pe.py | 121 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/miasm/jitter/loader/pe.py b/miasm/jitter/loader/pe.py index 28010b748..6d359a9a2 100644 --- a/miasm/jitter/loader/pe.py +++ b/miasm/jitter/loader/pe.py @@ -695,3 +695,124 @@ def guess_arch(pe): """Return the architecture specified by the PE container @pe. If unknown, return None""" return PE_machine.get(pe.Coffhdr.machine, None) + + +class ImpRecStateMachine(object): + """ + Finite State Machine used for internal purpose only. + See `ImpRecStrategy` for more details. + """ + + # Looking for a function pointer + STATE_SEARCH = 0 + # Candidate function list + STATE_FUNC_FOUND = 1 + # Function list found, terminated by a NULL entry + STATE_END_FUNC_LIST = 2 + + def __init__(self, libs, ptrtype): + self.ptrtype = ptrtype + self.libs = libs + self.func_addrs = set(struct.pack(self.ptrtype, address) for address in self.libs.cname2addr.values()) + self.off2name = {v:k for k,v in self.libs.name2off.items()} + self.state = self.STATE_SEARCH + + # STATE_FUNC_FOUND + self.cur_list = [] + self.cur_list_lib = None + + # STATE_END_FUNC_LIST + self.seen = [] + + def format_func_info(self, func_info, func_addr): + return { + "lib_addr": func_info[0], + "lib_name": self.off2name[func_info[0]], + "entry_name": func_info[1], + "entry_module_addr": func_addr, + "entry_memory_addr": self.cur_address, + } + + def transition(self, data): + if self.state == self.STATE_SEARCH: + if data in self.func_addrs: + self.state = self.STATE_FUNC_FOUND + func_addr = struct.unpack(self.ptrtype, data)[0] + func_info = self.libs.fad2info[func_addr] + self.cur_list = [self.format_func_info(func_info, func_addr)] + self.cur_list_lib = func_info[0] + elif self.state == self.STATE_FUNC_FOUND: + if data == (b"\x00" * len(data)): + self.state = self.STATE_END_FUNC_LIST + elif data in self.func_addrs: + func_addr = struct.unpack(self.ptrtype, data)[0] + func_info = self.libs.fad2info[func_addr] + if func_info[0] != self.cur_list_lib: + # The list must belong to the same library + self.state = self.STATE_SEARCH + return + self.cur_list.append(self.format_func_info(func_info, func_addr)) + else: + self.state == self.STATE_SEARCH + elif self.state == self.STATE_END_FUNC_LIST: + self.seen.append(self.cur_list) + self.state = self.STATE_SEARCH + self.transition(data) + else: + raise ValueError() + + def run(self): + while True: + data, address = yield + self.cur_address = address + self.transition(data) + + +class ImpRecStrategy(object): + """ + Naive import reconstruction, similar to ImpRec + + It looks for a continuation of module export addresses, ended by a NULL entry, ie: + [...] + &Kernel32::LoadLibraryA + &Kernel32::HeapCreate + 00 00 00 00 + [...] + + Usage: + >>> sb = Sandbox[...] + >>> sb.run() + >>> imprec = ImpRecStrategy(sb.jitter, sb.libs, size=32) + >>> imprec.recover_import() + List> + + -> sb.libs has also been updated, ready to be passed to `vm2pe` + """ + def __init__(self, jitter, libs, size): + self._jitter = jitter + self._libs = libs + if size == 32: + self._ptrtype = " Date: Sun, 23 Apr 2023 15:41:00 +0200 Subject: [PATCH 2/2] Add a sandbox example using the ImpRec strategy --- example/jitter/unpack_generic.py | 53 ++++++++++++++++++++++++++++++++ test/test_all.py | 6 ++++ 2 files changed, 59 insertions(+) create mode 100644 example/jitter/unpack_generic.py diff --git a/example/jitter/unpack_generic.py b/example/jitter/unpack_generic.py new file mode 100644 index 000000000..3329d2a93 --- /dev/null +++ b/example/jitter/unpack_generic.py @@ -0,0 +1,53 @@ +from __future__ import print_function +import os +import logging +from miasm.analysis.sandbox import Sandbox_Win_x86_32 +from miasm.jitter.loader.pe import vm2pe, ImpRecStrategy +from miasm.core.locationdb import LocationDB +from miasm.jitter.jitload import JitterException + +parser = Sandbox_Win_x86_32.parser(description="Generic & dummy unpacker") +parser.add_argument("filename", help="PE Filename") +parser.add_argument("--oep", help="Stop and dump if this address is reached") +parser.add_argument('-v', "--verbose", + help="verbose mode", action="store_true") +options = parser.parse_args() + +loc_db = LocationDB() +sb = Sandbox_Win_x86_32( + loc_db, options.filename, options, globals(), + parse_reloc=False +) + +if options.verbose is True: + logging.basicConfig(level=logging.INFO) +else: + logging.basicConfig(level=logging.WARNING) + +if options.verbose is True: + print(sb.jitter.vm) + +def stop(jitter): + logging.info('User provided OEP reached') + # Stop execution + return False + +if options.oep: + # Set callbacks + sb.jitter.add_breakpoint(int(options.oep, 0), stop) + +# Run until an error is encountered - IT IS UNLIKELY THE ORIGINAL ENTRY POINT +try: + sb.run() +except (JitterException, ValueError) as e: + logging.exception(e) + +out_fname = "%s.dump" % (options.filename) + +# Try a generic approach to rebuild the Import Table +imprec = ImpRecStrategy(sb.jitter, sb.libs, 32) +imprec.recover_import() + +# Rebuild the PE and dump it +print("Dump to %s" % out_fname) +vm2pe(sb.jitter, out_fname, libs=sb.libs, e_orig=sb.pe) diff --git a/test/test_all.py b/test/test_all.py index 2d078bf12..2d7a11f47 100755 --- a/test/test_all.py +++ b/test/test_all.py @@ -799,6 +799,12 @@ class ExampleJitterNoPython(ExampleJitter): products=[Example.get_sample("box_upx_exe_unupx.bin")], tags=tags.get(jitter, [])) + testset += ExampleJitter(["unpack_generic.py", + Example.get_sample("box_upx.exe")] + + ["--jitter", jitter, "-o"], + products=[Example.get_sample("box_upx.exe.dump")], + tags=tags.get(jitter, [])) + testset += ExampleJitter(["memory_breakpoint.py", Example.get_sample("box_upx.exe")] + ["--jitter", jitter] +