Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic import recovery (cheap ImpRec style) #1448

Merged
merged 2 commits into from
Apr 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions example/jitter/unpack_generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from __future__ import print_function
import os
import logging
from miasm.analysis.sandbox import Sandbox_Win_x86_32
from miasm.jitter.loader.pe import vm2pe, ImpRecStrategy
from miasm.core.locationdb import LocationDB
from miasm.jitter.jitload import JitterException

parser = Sandbox_Win_x86_32.parser(description="Generic & dummy unpacker")
parser.add_argument("filename", help="PE Filename")
parser.add_argument("--oep", help="Stop and dump if this address is reached")
parser.add_argument('-v', "--verbose",
help="verbose mode", action="store_true")
options = parser.parse_args()

loc_db = LocationDB()
sb = Sandbox_Win_x86_32(
loc_db, options.filename, options, globals(),
parse_reloc=False
)

if options.verbose is True:
logging.basicConfig(level=logging.INFO)
else:
logging.basicConfig(level=logging.WARNING)

if options.verbose is True:
print(sb.jitter.vm)

def stop(jitter):
logging.info('User provided OEP reached')
# Stop execution
return False

if options.oep:
# Set callbacks
sb.jitter.add_breakpoint(int(options.oep, 0), stop)

# Run until an error is encountered - IT IS UNLIKELY THE ORIGINAL ENTRY POINT
try:
sb.run()
except (JitterException, ValueError) as e:
logging.exception(e)

out_fname = "%s.dump" % (options.filename)

# Try a generic approach to rebuild the Import Table
imprec = ImpRecStrategy(sb.jitter, sb.libs, 32)
imprec.recover_import()

# Rebuild the PE and dump it
print("Dump to %s" % out_fname)
vm2pe(sb.jitter, out_fname, libs=sb.libs, e_orig=sb.pe)
121 changes: 121 additions & 0 deletions miasm/jitter/loader/pe.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,3 +695,124 @@ def guess_arch(pe):
"""Return the architecture specified by the PE container @pe.
If unknown, return None"""
return PE_machine.get(pe.Coffhdr.machine, None)


class ImpRecStateMachine(object):
"""
Finite State Machine used for internal purpose only.
See `ImpRecStrategy` for more details.
"""

# Looking for a function pointer
STATE_SEARCH = 0
# Candidate function list
STATE_FUNC_FOUND = 1
# Function list found, terminated by a NULL entry
STATE_END_FUNC_LIST = 2

def __init__(self, libs, ptrtype):
self.ptrtype = ptrtype
self.libs = libs
self.func_addrs = set(struct.pack(self.ptrtype, address) for address in self.libs.cname2addr.values())
self.off2name = {v:k for k,v in self.libs.name2off.items()}
self.state = self.STATE_SEARCH

# STATE_FUNC_FOUND
self.cur_list = []
self.cur_list_lib = None

# STATE_END_FUNC_LIST
self.seen = []

def format_func_info(self, func_info, func_addr):
return {
"lib_addr": func_info[0],
"lib_name": self.off2name[func_info[0]],
"entry_name": func_info[1],
"entry_module_addr": func_addr,
"entry_memory_addr": self.cur_address,
}

def transition(self, data):
if self.state == self.STATE_SEARCH:
if data in self.func_addrs:
self.state = self.STATE_FUNC_FOUND
func_addr = struct.unpack(self.ptrtype, data)[0]
func_info = self.libs.fad2info[func_addr]
self.cur_list = [self.format_func_info(func_info, func_addr)]
self.cur_list_lib = func_info[0]
elif self.state == self.STATE_FUNC_FOUND:
if data == (b"\x00" * len(data)):
self.state = self.STATE_END_FUNC_LIST
elif data in self.func_addrs:
func_addr = struct.unpack(self.ptrtype, data)[0]
func_info = self.libs.fad2info[func_addr]
if func_info[0] != self.cur_list_lib:
# The list must belong to the same library
self.state = self.STATE_SEARCH
return
self.cur_list.append(self.format_func_info(func_info, func_addr))
else:
self.state == self.STATE_SEARCH
elif self.state == self.STATE_END_FUNC_LIST:
self.seen.append(self.cur_list)
self.state = self.STATE_SEARCH
self.transition(data)
else:
raise ValueError()

def run(self):
while True:
data, address = yield
self.cur_address = address
self.transition(data)


class ImpRecStrategy(object):
"""
Naive import reconstruction, similar to ImpRec

It looks for a continuation of module export addresses, ended by a NULL entry, ie:
[...]
&Kernel32::LoadLibraryA
&Kernel32::HeapCreate
00 00 00 00
[...]

Usage:
>>> sb = Sandbox[...]
>>> sb.run()
>>> imprec = ImpRecStrategy(sb.jitter, sb.libs, size=32)
>>> imprec.recover_import()
List<List<Recovered functions>>

-> sb.libs has also been updated, ready to be passed to `vm2pe`
"""
def __init__(self, jitter, libs, size):
self._jitter = jitter
self._libs = libs
if size == 32:
self._ptrtype = "<I"
elif size == 64:
self._ptrtype = "<Q"
else:
ValueError("Unsupported size: %d" % size)

def recover_import(self, update_libs=True):
# Hypothesis: align on 4
# Search for several addresses from `func_addrs` ending with a `\x00`
fsm_obj = ImpRecStateMachine(self._libs, self._ptrtype)
fsm = fsm_obj.run()
fsm.send(None)
for addr_start, page_info in self._jitter.vm.get_all_memory().items():
data = page_info["data"]
for i in range(0, page_info["size"], 4):
fsm.send((data[i:i+4], addr_start + i))

# Apply to libs
if update_libs:
for entry_list in fsm_obj.seen:
for func_info in entry_list:
self._libs.lib_imp2dstad[func_info["lib_addr"]][func_info["entry_name"]].add(func_info["entry_memory_addr"])

return fsm_obj.seen
6 changes: 6 additions & 0 deletions test/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,12 @@ class ExampleJitterNoPython(ExampleJitter):
products=[Example.get_sample("box_upx_exe_unupx.bin")],
tags=tags.get(jitter, []))

testset += ExampleJitter(["unpack_generic.py",
Example.get_sample("box_upx.exe")] +
["--jitter", jitter, "-o"],
products=[Example.get_sample("box_upx.exe.dump")],
tags=tags.get(jitter, []))

testset += ExampleJitter(["memory_breakpoint.py",
Example.get_sample("box_upx.exe")] +
["--jitter", jitter] +
Expand Down