diff --git a/cle/backends/backend.py b/cle/backends/backend.py index ea6571c4..2a593aaa 100644 --- a/cle/backends/backend.py +++ b/cle/backends/backend.py @@ -12,6 +12,7 @@ from cle.address_translator import AT from cle.errors import CLEError, CLEOperationError from cle.memory import Clemory +from cle.structs import DataDirectory from .regions import Regions from .relocation import Relocation @@ -20,6 +21,7 @@ if TYPE_CHECKING: from cle.backends import Section, Segment from cle.loader import Loader + from cle.structs import MemRegion log = logging.getLogger(name=__name__) @@ -31,6 +33,7 @@ class FunctionHintSource: EH_FRAME = 0 EXTERNAL_EH_FRAME = 1 + EXPORT_TABLE = 2 class FunctionHint: @@ -41,14 +44,16 @@ class FunctionHint: :ivar int size: Size of the function. :ivar source: Source of this hint. :vartype source: int + :ivar str | None name: Optional symbol name, if known. """ - __slots__ = ("addr", "size", "source") + __slots__ = ("addr", "size", "source", "name") - def __init__(self, addr, size, source): + def __init__(self, addr, size, source, name=None): self.addr = addr self.size = size self.source = source + self.name = name def __repr__(self): return f"" @@ -242,6 +247,10 @@ def __init__( # they should be rebased when .rebase() is called self.function_hints: list[FunctionHint] = [] + # Metadata regions (e.g., PE import/export tables) + # they should be rebased when .rebase() is called + self.meta_regions: list[MemRegion] = [] + # line number mapping self.addr_to_line = {} @@ -375,6 +384,16 @@ def rebase(self, new_base): for hint in self.function_hints: hint.addr = hint.addr + self.image_base_delta + self._rebase_meta_regions(self.image_base_delta) + + def _rebase_meta_regions(self, delta: int): + """Rebase all meta_regions by the given delta.""" + for region in self.meta_regions: + region.vaddr += delta + if isinstance(region, DataDirectory): + for sub in region.sub_regions: + sub.vaddr += delta + def relocate(self): """ Apply all resolved relocations to memory. @@ -497,7 +516,7 @@ def initial_register_values(self): Deprecated """ log.critical( - "Deprecation warning: initial_register_values is deprecated - " "use backend.thread_registers() instead" + "Deprecation warning: initial_register_values is deprecated - use backend.thread_registers() instead" ) return self.thread_registers().items() diff --git a/cle/backends/pe/pe.py b/cle/backends/pe/pe.py index cac2f10f..1f6b2916 100644 --- a/cle/backends/pe/pe.py +++ b/cle/backends/pe/pe.py @@ -14,6 +14,7 @@ from cle.address_translator import AT from cle.backends.backend import Backend, FunctionHint, FunctionHintSource, register_backend from cle.backends.symbol import SymbolType +from cle.structs import DataDirectory, MemRegion, MemRegionSort, PointerArray, StringBlob, StructArray from cle.utils import extract_null_terminated_bytestr from .regions import PESection @@ -127,6 +128,7 @@ def __init__( self._handle_imports() self._handle_exports() self._handle_seh() + self._parse_meta_regions() if self.loader._perform_relocations: # parse base relocs self._pe.parse_data_directories(directories=[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_BASERELOC"]]) @@ -385,6 +387,245 @@ def _handle_seh(self): ) ) + def _parse_meta_regions(self): + """ + Walk pefile's parsed data directories and build meta_regions describing the locations and layouts of PE + metadata structures. + All addresses are stored as linked virtual addresses (linked_base + RVA). + """ + pe = self._pe + base = self.linked_base + is_64 = self.arch.bits == 64 if self._arch is not None else (pe.OPTIONAL_HEADER.Magic == 0x20B) + ptr_size = 8 if is_64 else 4 + + # --- IAT (Data Directory 12) --- + iat_idx = pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_IAT"] + iat_dd = pe.OPTIONAL_HEADER.DATA_DIRECTORY[iat_idx] + if iat_dd.VirtualAddress and iat_dd.Size: + self.meta_regions.append( + PointerArray( + vaddr=base + iat_dd.VirtualAddress, + entry_size=ptr_size, + count=iat_dd.Size // ptr_size, + sort=MemRegionSort.IAT, + ) + ) + + # --- Export Directory (Data Directory 0) --- + if hasattr(pe, "DIRECTORY_ENTRY_EXPORT"): + exp_dd = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_EXPORT"]] + exp = pe.DIRECTORY_ENTRY_EXPORT + exp_struct = exp.struct + + sub_regions: list[MemRegion] = [] + + # Export directory header (IMAGE_EXPORT_DIRECTORY, 40 bytes) + sub_regions.append( + MemRegion( + vaddr=base + exp_dd.VirtualAddress, + size=exp_struct.sizeof(), + sort=MemRegionSort.EXPORT_DIRECTORY, + ) + ) + + n_funcs = exp_struct.NumberOfFunctions + n_names = exp_struct.NumberOfNames + + # AddressOfFunctions array + if exp_struct.AddressOfFunctions and n_funcs: + sub_regions.append( + PointerArray( + vaddr=base + exp_struct.AddressOfFunctions, + entry_size=4, + count=n_funcs, + sort=MemRegionSort.EXPORT_ADDR_TABLE, + ) + ) + + # AddressOfNames array + if exp_struct.AddressOfNames and n_names: + sub_regions.append( + PointerArray( + vaddr=base + exp_struct.AddressOfNames, + entry_size=4, + count=n_names, + sort=MemRegionSort.EXPORT_NAME_TABLE, + ) + ) + + # AddressOfNameOrdinals array + if exp_struct.AddressOfNameOrdinals and n_names: + sub_regions.append( + PointerArray( + vaddr=base + exp_struct.AddressOfNameOrdinals, + entry_size=2, + count=n_names, + sort=MemRegionSort.EXPORT_ORDINAL_TABLE, + ) + ) + + # Export name strings: from end of ordinals table to end of export data directory + if n_names and exp_struct.AddressOfNameOrdinals: + strings_start = exp_struct.AddressOfNameOrdinals + n_names * 2 + strings_end = exp_dd.VirtualAddress + exp_dd.Size + if strings_end > strings_start: + sub_regions.append( + StringBlob( + vaddr=base + strings_start, + size=strings_end - strings_start, + sort=MemRegionSort.STRING_BLOB, + ) + ) + + self.meta_regions.append( + DataDirectory( + vaddr=base + exp_dd.VirtualAddress, + size=exp_dd.Size, + sort=MemRegionSort.EXPORT_DIRECTORY, + sub_regions=sub_regions, + ) + ) + + # Extract function hints from exports + exp_rva_start = exp_dd.VirtualAddress + exp_rva_end = exp_dd.VirtualAddress + exp_dd.Size + for sym in exp.symbols: + if sym.forwarder is not None: + continue + # Forwarder RVAs point within the export directory; skip them + if exp_rva_start <= sym.address < exp_rva_end: + continue + name = sym.name.decode() if sym.name else None + self.function_hints.append( + FunctionHint( + base + sym.address, + 0, + FunctionHintSource.EXPORT_TABLE, + name=name, + ) + ) + + # --- Import Directory (Data Directory 1) --- + if hasattr(pe, "DIRECTORY_ENTRY_IMPORT"): + imp_dd = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_IMPORT"]] + entries = pe.DIRECTORY_ENTRY_IMPORT + + sub_regions: list[MemRegion] = [] + + # Import descriptor array (including null terminator) + n_descs = len(entries) + 1 # +1 for null terminator + sub_regions.append( + StructArray( + vaddr=base + imp_dd.VirtualAddress, + entry_size=20, # sizeof(IMAGE_IMPORT_DESCRIPTOR) + count=n_descs, + sort=MemRegionSort.IMPORT_DIRECTORY, + ) + ) + + # Per-DLL ILT arrays and track hint/name range + hn_min = None + hn_max = None + for entry in entries: + ilt_rva = entry.struct.OriginalFirstThunk + if ilt_rva: + n_imports = len(entry.imports) + 1 # +1 for null terminator + sub_regions.append( + PointerArray( + vaddr=base + ilt_rva, + entry_size=ptr_size, + count=n_imports, + sort=MemRegionSort.ILT, + ) + ) + + # Track hint/name table extent + for imp in entry.imports: + if imp.hint_name_table_rva: + rva = imp.hint_name_table_rva + # Each hint/name entry = 2 byte hint + name + null byte, word-aligned + name_len = len(imp.name) + 1 if imp.name else 1 + entry_size = 2 + name_len + if entry_size % 2: + entry_size += 1 + entry_end = rva + entry_size + if hn_min is None or rva < hn_min: + hn_min = rva + if hn_max is None or entry_end > hn_max: + hn_max = entry_end + + # Hint/Name table blob + if hn_min is not None and hn_max is not None: + sub_regions.append( + StringBlob( + vaddr=base + hn_min, + size=hn_max - hn_min, + sort=MemRegionSort.IMPORT_HINT_NAME_TABLE, + ) + ) + + # DLL name strings (pointed to by each descriptor's Name field) + for entry in entries: + name_rva = entry.struct.Name + if name_rva and entry.dll: + sub_regions.append( + StringBlob( + vaddr=base + name_rva, + size=len(entry.dll) + 1, # +1 for null terminator + sort=MemRegionSort.STRING_BLOB, + ) + ) + + self.meta_regions.append( + DataDirectory( + vaddr=base + imp_dd.VirtualAddress, + size=imp_dd.Size, + sort=MemRegionSort.IMPORT_DIRECTORY, + sub_regions=sub_regions, + ) + ) + + # --- Delay Import Directory (Data Directory 13) --- + if hasattr(pe, "DIRECTORY_ENTRY_DELAY_IMPORT"): + delay_dd = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_DELAY_IMPORT"]] + entries = pe.DIRECTORY_ENTRY_DELAY_IMPORT + + sub_regions: list[MemRegion] = [] + + # Delay import descriptor array (including null terminator) + n_descs = len(entries) + 1 + sub_regions.append( + StructArray( + vaddr=base + delay_dd.VirtualAddress, + entry_size=32, # sizeof(IMAGE_DELAY_IMPORT_DESCRIPTOR) + count=n_descs, + sort=MemRegionSort.DELAY_IMPORT_DIRECTORY, + ) + ) + + # Per-DLL delay INT arrays + for entry in entries: + int_rva = entry.struct.pINT + if int_rva: + n_imports = len(entry.imports) + 1 + sub_regions.append( + PointerArray( + vaddr=base + int_rva, + entry_size=ptr_size, + count=n_imports, + sort=MemRegionSort.ILT, + ) + ) + + self.meta_regions.append( + DataDirectory( + vaddr=base + delay_dd.VirtualAddress, + size=delay_dd.Size, + sort=MemRegionSort.DELAY_IMPORT_DIRECTORY, + sub_regions=sub_regions, + ) + ) + def __register_relocs(self): if not hasattr(self._pe, "DIRECTORY_ENTRY_BASERELOC"): log.debug("%s has no relocations", self.binary) diff --git a/cle/structs/__init__.py b/cle/structs/__init__.py new file mode 100644 index 00000000..54c73a9a --- /dev/null +++ b/cle/structs/__init__.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from .mem_regions import DataDirectory, MemRegion, MemRegionSort, PointerArray, StringBlob, StructArray + +__all__ = [ + "MemRegion", + "MemRegionSort", + "PointerArray", + "StructArray", + "DataDirectory", + "StringBlob", +] diff --git a/cle/structs/mem_regions.py b/cle/structs/mem_regions.py new file mode 100644 index 00000000..8fe771e4 --- /dev/null +++ b/cle/structs/mem_regions.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from enum import Enum + + +class MemRegionSort(Enum): + """Semantic tag for a metadata memory region.""" + + # Generic + POINTER_ARRAY = "pointer-array" + STRUCT_ARRAY = "struct-array" + STRING_BLOB = "string-blob" + DATA = "data" + + # PE-specific + IAT = "iat" + ILT = "ilt" + EXPORT_DIRECTORY = "export-directory" + EXPORT_ADDR_TABLE = "export-addr-table" + EXPORT_NAME_TABLE = "export-name-table" + EXPORT_ORDINAL_TABLE = "export-ordinal-table" + IMPORT_DIRECTORY = "import-directory" + IMPORT_HINT_NAME_TABLE = "import-hint-name-table" + DELAY_IMPORT_DIRECTORY = "delay-import-directory" + + +class MemRegion: + """A contiguous region of metadata in memory.""" + + __slots__ = ("vaddr", "size", "sort") + + def __init__(self, vaddr: int, size: int, sort: MemRegionSort): + self.vaddr = vaddr + self.size = size + self.sort = sort + + def __repr__(self): + return f"" + + +class PointerArray(MemRegion): + """An array of fixed-size pointer entries.""" + + __slots__ = ("entry_size", "count") + + def __init__(self, vaddr: int, entry_size: int, count: int, sort: MemRegionSort = MemRegionSort.POINTER_ARRAY): + super().__init__(vaddr, entry_size * count, sort) + self.entry_size = entry_size + self.count = count + + def __repr__(self): + return f"" + + +class StructArray(MemRegion): + """An array of fixed-size structures.""" + + __slots__ = ("entry_size", "count") + + def __init__(self, vaddr: int, entry_size: int, count: int, sort: MemRegionSort = MemRegionSort.STRUCT_ARRAY): + super().__init__(vaddr, entry_size * count, sort) + self.entry_size = entry_size + self.count = count + + def __repr__(self): + return f"" + + +class StringBlob(MemRegion): + """A blob of packed null-terminated strings.""" + + __slots__ = () + + def __init__(self, vaddr: int, size: int, sort: MemRegionSort = MemRegionSort.STRING_BLOB): + super().__init__(vaddr, size, sort) + + def __repr__(self): + return f"" + + +class DataDirectory(MemRegion): + """A composite region made up of sub-regions.""" + + __slots__ = ("sub_regions",) + + def __init__(self, vaddr: int, size: int, sort: MemRegionSort, sub_regions: list[MemRegion] | None = None): + super().__init__(vaddr, size, sort) + self.sub_regions = sub_regions or [] + + def __repr__(self): + return ( + f"" + ) + + def flat_regions(self) -> list[MemRegion]: + """Return all sub-regions flattened (non-recursive for now).""" + return list(self.sub_regions) diff --git a/tests/test_pe_meta_regions.py b/tests/test_pe_meta_regions.py new file mode 100644 index 00000000..9373eb64 --- /dev/null +++ b/tests/test_pe_meta_regions.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +import os +import unittest + +import cle +from cle.structs import DataDirectory, MemRegionSort, PointerArray + +TEST_BASE = os.path.join(os.path.dirname(os.path.realpath(__file__)), os.path.join("..", "..", "binaries")) + + +class TestPEMetaRegions(unittest.TestCase): + """Test that cle's PE backend correctly exposes meta_regions.""" + + @classmethod + def setUpClass(cls): + TEST_BINARY = os.path.join( + TEST_BASE, "tests", "i386", "windows", "3995b0522f1daaf8dc1341f87f34a1897ae8988e8dfa1cbe0bc98943385f4c38" + ) + + # Known layout of the test binary (PE32, ImageBase=0x76be0000): + # .text section: RVA 0x1000..0x26d98 + # IAT (dir 12): RVA 0x1000, size 0x4d0 -> inside .text + # Export dir (dir 0): RVA 0x3440, size 0x117d -> inside .text + # Import dir (dir 1): RVA 0x24f90, size 0xc8 -> inside .text + # Delay import (dir 13): RVA 0x24ef8, size 0x40 -> inside .text + + cls._image_base = 0x76BE0000 + + cls.loader = cle.Loader(TEST_BINARY, auto_load_libs=False) + cls.pe_obj = cls.loader.main_object + + def test_meta_regions_populated(self): + """meta_regions should be non-empty for a PE with import/export tables.""" + assert len(self.pe_obj.meta_regions) > 0 + + def test_iat_region_exists(self): + """IAT should be present as a PointerArray with sort IAT.""" + iat_regions = [ + r + for r in self.pe_obj.meta_regions + if not isinstance(r, DataDirectory) and isinstance(r, PointerArray) and r.sort == MemRegionSort.IAT + ] + assert len(iat_regions) == 1 + iat = iat_regions[0] + assert iat.vaddr == self._image_base + 0x1000 + assert iat.size == 0x4D0 + assert iat.entry_size == 4 + + def test_export_directory_exists(self): + """Export directory should be present as a DataDirectory.""" + exp_dirs = [ + r + for r in self.pe_obj.meta_regions + if isinstance(r, DataDirectory) and r.sort == MemRegionSort.EXPORT_DIRECTORY + ] + assert len(exp_dirs) == 1 + exp = exp_dirs[0] + assert exp.vaddr == self._image_base + 0x3440 + assert exp.size == 0x117D + # Should have sub-regions: header, func table, name table, ordinal table, name strings + assert len(exp.sub_regions) >= 4 + + def test_import_directory_exists(self): + """Import directory should be present as a DataDirectory.""" + imp_dirs = [ + r + for r in self.pe_obj.meta_regions + if isinstance(r, DataDirectory) and r.sort == MemRegionSort.IMPORT_DIRECTORY + ] + assert len(imp_dirs) == 1 + imp = imp_dirs[0] + assert imp.vaddr == self._image_base + 0x24F90 + + def test_delay_import_directory_exists(self): + """Delay import directory should be present.""" + delay_dirs = [ + r + for r in self.pe_obj.meta_regions + if isinstance(r, DataDirectory) and r.sort == MemRegionSort.DELAY_IMPORT_DIRECTORY + ] + assert len(delay_dirs) == 1 + delay = delay_dirs[0] + assert delay.vaddr == self._image_base + 0x24EF8 + + +if __name__ == "__main__": + unittest.main()