Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions cle/backends/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from cle.address_translator import AT
from cle.errors import CLEError, CLEOperationError
from cle.memory import Clemory
from cle.structs import DataDirectory

from .regions import Regions
from .relocation import Relocation
Expand All @@ -20,6 +21,7 @@
if TYPE_CHECKING:
from cle.backends import Section, Segment
from cle.loader import Loader
from cle.structs import MemRegion

log = logging.getLogger(name=__name__)

Expand All @@ -31,6 +33,7 @@ class FunctionHintSource:

EH_FRAME = 0
EXTERNAL_EH_FRAME = 1
EXPORT_TABLE = 2


class FunctionHint:
Expand All @@ -41,14 +44,16 @@ class FunctionHint:
:ivar int size: Size of the function.
:ivar source: Source of this hint.
:vartype source: int
:ivar str | None name: Optional symbol name, if known.
"""

__slots__ = ("addr", "size", "source")
__slots__ = ("addr", "size", "source", "name")

def __init__(self, addr, size, source):
def __init__(self, addr, size, source, name=None):
self.addr = addr
self.size = size
self.source = source
self.name = name

def __repr__(self):
return f"<FuncHint@{self.addr:#x}, {self.size} bytes>"
Expand Down Expand Up @@ -242,6 +247,10 @@ def __init__(
# they should be rebased when .rebase() is called
self.function_hints: list[FunctionHint] = []

# Metadata regions (e.g., PE import/export tables)
# they should be rebased when .rebase() is called
self.meta_regions: list[MemRegion] = []

# line number mapping
self.addr_to_line = {}

Expand Down Expand Up @@ -375,6 +384,16 @@ def rebase(self, new_base):
for hint in self.function_hints:
hint.addr = hint.addr + self.image_base_delta

self._rebase_meta_regions(self.image_base_delta)

def _rebase_meta_regions(self, delta: int):
"""Rebase all meta_regions by the given delta."""
for region in self.meta_regions:
region.vaddr += delta
if isinstance(region, DataDirectory):
for sub in region.sub_regions:
sub.vaddr += delta

def relocate(self):
"""
Apply all resolved relocations to memory.
Expand Down Expand Up @@ -497,7 +516,7 @@ def initial_register_values(self):
Deprecated
"""
log.critical(
"Deprecation warning: initial_register_values is deprecated - " "use backend.thread_registers() instead"
"Deprecation warning: initial_register_values is deprecated - use backend.thread_registers() instead"
)
return self.thread_registers().items()

Expand Down
241 changes: 241 additions & 0 deletions cle/backends/pe/pe.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from cle.address_translator import AT
from cle.backends.backend import Backend, FunctionHint, FunctionHintSource, register_backend
from cle.backends.symbol import SymbolType
from cle.structs import DataDirectory, MemRegion, MemRegionSort, PointerArray, StringBlob, StructArray
from cle.utils import extract_null_terminated_bytestr

from .regions import PESection
Expand Down Expand Up @@ -127,6 +128,7 @@ def __init__(
self._handle_imports()
self._handle_exports()
self._handle_seh()
self._parse_meta_regions()
if self.loader._perform_relocations:
# parse base relocs
self._pe.parse_data_directories(directories=[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_BASERELOC"]])
Expand Down Expand Up @@ -385,6 +387,245 @@ def _handle_seh(self):
)
)

def _parse_meta_regions(self):
"""
Walk pefile's parsed data directories and build meta_regions describing the locations and layouts of PE
metadata structures.
All addresses are stored as linked virtual addresses (linked_base + RVA).
"""
pe = self._pe
base = self.linked_base
is_64 = self.arch.bits == 64 if self._arch is not None else (pe.OPTIONAL_HEADER.Magic == 0x20B)
ptr_size = 8 if is_64 else 4

# --- IAT (Data Directory 12) ---
iat_idx = pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_IAT"]
iat_dd = pe.OPTIONAL_HEADER.DATA_DIRECTORY[iat_idx]
if iat_dd.VirtualAddress and iat_dd.Size:
self.meta_regions.append(
PointerArray(
vaddr=base + iat_dd.VirtualAddress,
entry_size=ptr_size,
count=iat_dd.Size // ptr_size,
sort=MemRegionSort.IAT,
)
)

# --- Export Directory (Data Directory 0) ---
if hasattr(pe, "DIRECTORY_ENTRY_EXPORT"):
exp_dd = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_EXPORT"]]
exp = pe.DIRECTORY_ENTRY_EXPORT
exp_struct = exp.struct

sub_regions: list[MemRegion] = []

# Export directory header (IMAGE_EXPORT_DIRECTORY, 40 bytes)
sub_regions.append(
MemRegion(
vaddr=base + exp_dd.VirtualAddress,
size=exp_struct.sizeof(),
sort=MemRegionSort.EXPORT_DIRECTORY,
)
)

n_funcs = exp_struct.NumberOfFunctions
n_names = exp_struct.NumberOfNames

# AddressOfFunctions array
if exp_struct.AddressOfFunctions and n_funcs:
sub_regions.append(
PointerArray(
vaddr=base + exp_struct.AddressOfFunctions,
entry_size=4,
count=n_funcs,
sort=MemRegionSort.EXPORT_ADDR_TABLE,
)
)

# AddressOfNames array
if exp_struct.AddressOfNames and n_names:
sub_regions.append(
PointerArray(
vaddr=base + exp_struct.AddressOfNames,
entry_size=4,
count=n_names,
sort=MemRegionSort.EXPORT_NAME_TABLE,
)
)

# AddressOfNameOrdinals array
if exp_struct.AddressOfNameOrdinals and n_names:
sub_regions.append(
PointerArray(
vaddr=base + exp_struct.AddressOfNameOrdinals,
entry_size=2,
count=n_names,
sort=MemRegionSort.EXPORT_ORDINAL_TABLE,
)
)

# Export name strings: from end of ordinals table to end of export data directory
if n_names and exp_struct.AddressOfNameOrdinals:
strings_start = exp_struct.AddressOfNameOrdinals + n_names * 2
strings_end = exp_dd.VirtualAddress + exp_dd.Size
if strings_end > strings_start:
sub_regions.append(
StringBlob(
vaddr=base + strings_start,
size=strings_end - strings_start,
sort=MemRegionSort.STRING_BLOB,
)
)

self.meta_regions.append(
DataDirectory(
vaddr=base + exp_dd.VirtualAddress,
size=exp_dd.Size,
sort=MemRegionSort.EXPORT_DIRECTORY,
sub_regions=sub_regions,
)
)

# Extract function hints from exports
exp_rva_start = exp_dd.VirtualAddress
exp_rva_end = exp_dd.VirtualAddress + exp_dd.Size
for sym in exp.symbols:
if sym.forwarder is not None:
continue
# Forwarder RVAs point within the export directory; skip them
if exp_rva_start <= sym.address < exp_rva_end:
continue
name = sym.name.decode() if sym.name else None
self.function_hints.append(
FunctionHint(
base + sym.address,
0,
FunctionHintSource.EXPORT_TABLE,
name=name,
)
)

# --- Import Directory (Data Directory 1) ---
if hasattr(pe, "DIRECTORY_ENTRY_IMPORT"):
imp_dd = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_IMPORT"]]
entries = pe.DIRECTORY_ENTRY_IMPORT

sub_regions: list[MemRegion] = []

# Import descriptor array (including null terminator)
n_descs = len(entries) + 1 # +1 for null terminator
sub_regions.append(
StructArray(
vaddr=base + imp_dd.VirtualAddress,
entry_size=20, # sizeof(IMAGE_IMPORT_DESCRIPTOR)
count=n_descs,
sort=MemRegionSort.IMPORT_DIRECTORY,
)
)

# Per-DLL ILT arrays and track hint/name range
hn_min = None
hn_max = None
for entry in entries:
ilt_rva = entry.struct.OriginalFirstThunk
if ilt_rva:
n_imports = len(entry.imports) + 1 # +1 for null terminator
sub_regions.append(
PointerArray(
vaddr=base + ilt_rva,
entry_size=ptr_size,
count=n_imports,
sort=MemRegionSort.ILT,
)
)

# Track hint/name table extent
for imp in entry.imports:
if imp.hint_name_table_rva:
rva = imp.hint_name_table_rva
# Each hint/name entry = 2 byte hint + name + null byte, word-aligned
name_len = len(imp.name) + 1 if imp.name else 1
entry_size = 2 + name_len
if entry_size % 2:
entry_size += 1
entry_end = rva + entry_size
if hn_min is None or rva < hn_min:
hn_min = rva
if hn_max is None or entry_end > hn_max:
hn_max = entry_end

# Hint/Name table blob
if hn_min is not None and hn_max is not None:
sub_regions.append(
StringBlob(
vaddr=base + hn_min,
size=hn_max - hn_min,
sort=MemRegionSort.IMPORT_HINT_NAME_TABLE,
)
)

# DLL name strings (pointed to by each descriptor's Name field)
for entry in entries:
name_rva = entry.struct.Name
if name_rva and entry.dll:
sub_regions.append(
StringBlob(
vaddr=base + name_rva,
size=len(entry.dll) + 1, # +1 for null terminator
sort=MemRegionSort.STRING_BLOB,
)
)

self.meta_regions.append(
DataDirectory(
vaddr=base + imp_dd.VirtualAddress,
size=imp_dd.Size,
sort=MemRegionSort.IMPORT_DIRECTORY,
sub_regions=sub_regions,
)
)

# --- Delay Import Directory (Data Directory 13) ---
if hasattr(pe, "DIRECTORY_ENTRY_DELAY_IMPORT"):
delay_dd = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_DELAY_IMPORT"]]
entries = pe.DIRECTORY_ENTRY_DELAY_IMPORT

sub_regions: list[MemRegion] = []

# Delay import descriptor array (including null terminator)
n_descs = len(entries) + 1
sub_regions.append(
StructArray(
vaddr=base + delay_dd.VirtualAddress,
entry_size=32, # sizeof(IMAGE_DELAY_IMPORT_DESCRIPTOR)
count=n_descs,
sort=MemRegionSort.DELAY_IMPORT_DIRECTORY,
)
)

# Per-DLL delay INT arrays
for entry in entries:
int_rva = entry.struct.pINT
if int_rva:
n_imports = len(entry.imports) + 1
sub_regions.append(
PointerArray(
vaddr=base + int_rva,
entry_size=ptr_size,
count=n_imports,
sort=MemRegionSort.ILT,
)
)

self.meta_regions.append(
DataDirectory(
vaddr=base + delay_dd.VirtualAddress,
size=delay_dd.Size,
sort=MemRegionSort.DELAY_IMPORT_DIRECTORY,
sub_regions=sub_regions,
)
)

def __register_relocs(self):
if not hasattr(self._pe, "DIRECTORY_ENTRY_BASERELOC"):
log.debug("%s has no relocations", self.binary)
Expand Down
12 changes: 12 additions & 0 deletions cle/structs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from __future__ import annotations

from .mem_regions import DataDirectory, MemRegion, MemRegionSort, PointerArray, StringBlob, StructArray

__all__ = [
"MemRegion",
"MemRegionSort",
"PointerArray",
"StructArray",
"DataDirectory",
"StringBlob",
]
Loading
Loading