Skip to content

Commit

Permalink
Memory map provider (#1003)
Browse files Browse the repository at this point in the history
This change allows architectures to provide their own memory map by
defining `maps` in the architecture. If set, then `GefMemoryManager`
will defer to it when building maps.

This can be used for e.g. architectures that are built for use with
gdbserver.

It also adds `parse_info_mem` (Renaming the old one to
`parse_monitor_info_mem`) which parses the maps from this command. The
Black Magic Probe provides memory maps of its targets via this command.
  • Loading branch information
Grazfather committed Dec 18, 2023
1 parent 17c496c commit 4f20983
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 56 deletions.
124 changes: 85 additions & 39 deletions gef.py
Expand Up @@ -164,6 +164,8 @@ def update_gef(argv: List[str]) -> int:
__registered_architectures__ : Dict[Union["Elf.Abi", str], Type["Architecture"]] = {}
__registered_file_formats__ : Set[ Type["FileFormat"] ] = set()

GefMemoryMapProvider = Callable[[], Generator["Section", None, None]]


def reset_all_caches() -> None:
"""Free all caches. If an object is cached, it will have a callable attribute `cache_clear`
Expand Down Expand Up @@ -644,14 +646,22 @@ def from_process_maps(cls, perm_str: str) -> "Permission":
return perm

@classmethod
def from_info_mem(cls, perm_str: str) -> "Permission":
def from_monitor_info_mem(cls, perm_str: str) -> "Permission":
perm = cls(0)
# perm_str[0] shows if this is a user page, which
# we don't track
if perm_str[1] == "r": perm |= Permission.READ
if perm_str[2] == "w": perm |= Permission.WRITE
return perm

@classmethod
def from_info_mem(cls, perm_str: str) -> "Permission":
perm = cls(0)
if "r" in perm_str: perm |= Permission.READ
if "w" in perm_str: perm |= Permission.WRITE
if "x" in perm_str: perm |= Permission.EXECUTE
return perm


class Section:
"""GEF representation of process memory sections."""
Expand Down Expand Up @@ -2259,6 +2269,7 @@ class Architecture(ArchitectureBase):
_ptrsize: Optional[int] = None
_endianness: Optional[Endianness] = None
special_registers: Union[Tuple[()], Tuple[str, ...]] = ()
maps: Optional[GefMemoryMapProvider] = None

def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
Expand Down Expand Up @@ -10421,23 +10432,38 @@ def read_ascii_string(self, address: int) -> Optional[str]:
@property
def maps(self) -> List[Section]:
if not self.__maps:
self.__maps = self.__parse_maps()
self.__maps = self._parse_maps()
return self.__maps

def __parse_maps(self) -> List[Section]:
"""Return the mapped memory sections"""
@classmethod
def _parse_maps(cls) -> List[Section]:
"""Return the mapped memory sections. If the current arch has its maps
method defined, then defer to that to generated maps, otherwise, try to
figure it out from procfs, then info sections, then monitor info
mem."""
if gef.arch.maps is not None:
return list(gef.arch.maps())

try:
if is_qemu_system():
return list(self.__parse_info_mem())
except gdb.error:
# Target may not support this command
return list(cls.parse_procfs_maps())
except:
pass

try:
return list(cls.parse_gdb_info_sections())
except:
pass

try:
return list(self.__parse_procfs_maps())
except FileNotFoundError:
return list(self.__parse_gdb_info_sections())
return list(cls.parse_monitor_info_mem())
except:
pass

def __parse_procfs_maps(self) -> Generator[Section, None, None]:
warn("Cannot get memory map")
return None

@staticmethod
def parse_procfs_maps() -> Generator[Section, None, None]:
"""Get the memory mapping from procfs."""
procfs_mapfile = gef.session.maps
if not procfs_mapfile:
Expand All @@ -10460,14 +10486,15 @@ def __parse_procfs_maps(self) -> Generator[Section, None, None]:
perm = Permission.from_process_maps(perm)
inode = int(inode)
yield Section(page_start=addr_start,
page_end=addr_end,
offset=off,
permission=perm,
inode=inode,
path=pathname)
page_end=addr_end,
offset=off,
permission=perm,
inode=inode,
path=pathname)
return

def __parse_gdb_info_sections(self) -> Generator[Section, None, None]:
@staticmethod
def parse_gdb_info_sections() -> Generator[Section, None, None]:
"""Get the memory mapping from GDB's command `maintenance info sections` (limited info)."""
stream = StringIO(gdb.execute("maintenance info sections", to_string=True))

Expand All @@ -10481,41 +10508,60 @@ def __parse_gdb_info_sections(self) -> Generator[Section, None, None]:
off = int(parts[3][:-1], 16)
path = parts[4]
perm = Permission.from_info_sections(parts[5:])
yield Section(
page_start=addr_start,
page_end=addr_end,
offset=off,
permission=perm,
inode="",
path=path
)
yield Section(page_start=addr_start,
page_end=addr_end,
offset=off,
permission=perm,
path=path)

except IndexError:
continue
except ValueError:
continue
return

def __parse_info_mem(self) -> Generator[Section, None, None]:
"""Get the memory mapping from GDB's command `monitor info mem`"""
for line in StringIO(gdb.execute("monitor info mem", to_string=True)):
if not line:
break
@staticmethod
def parse_monitor_info_mem() -> Generator[Section, None, None]:
"""Get the memory mapping from GDB's command `monitor info mem`
This can raise an exception, which the memory manager takes to mean
that this method does not work to get a map.
"""
stream = StringIO(gdb.execute("monitor info mem", to_string=True))

for line in stream:
try:
ranges, off, perms = line.split()
off = int(off, 16)
start, end = [int(s, 16) for s in ranges.split("-")]
except ValueError as e:
continue

perm = Permission.from_info_mem(perms)
yield Section(
page_start=start,
page_end=end,
offset=off,
permission=perm,
inode="",
)
perm = Permission.from_monitor_info_mem(perms)
yield Section(page_start=start,
page_end=end,
offset=off,
permission=perm)

@staticmethod
def parse_info_mem():
"""Get the memory mapping from GDB's command `info mem`. This can be
provided by certain gdbserver implementations."""
for line in StringIO(gdb.execute("info mem", to_string=True)):
# Using memory regions provided by the target.
# Num Enb Low Addr High Addr Attrs
# 0 y 0x10000000 0x10200000 flash blocksize 0x1000 nocache
# 1 y 0x20000000 0x20042000 rw nocache
_, en, start, end, *attrs = line.split()
if en != "y":
continue

if "flash" in attrs:
perm = Permission.from_info_mem("r")
else:
perm = Permission.from_info_mem("rw")
yield Section(page_start=int(start, 0),
page_end=int(end, 0),
permission=perm)


class GefHeapManager(GefManager):
Expand Down
64 changes: 60 additions & 4 deletions tests/api/misc.py
Expand Up @@ -12,7 +12,12 @@
_target,
gdb_start_silent_cmd,
gdb_test_python_method,
gdb_run_cmd,
gdbserver_session,
qemuuser_session,
GefUnitTestGeneric,
GDBSERVER_DEFAULT_HOST,
GDBSERVER_DEFAULT_PORT,
)


Expand All @@ -26,7 +31,6 @@ def test_func_which(self):
res = gdb_test_python_method("which('__IDontExist__')")
self.assertIn("Missing file `__IDontExist__`", res)


def test_func_gef_convenience(self):
func = "gef_convenience('meh')"
res = gdb_test_python_method(func, target=_target("default"))
Expand All @@ -41,18 +45,70 @@ def test_func_parse_address(self):
res = gdb_test_python_method(func)
self.assertException(res)

def test_func_parse_permissions(self):
func = "Permission.from_info_sections('ALLOC LOAD READONLY CODE HAS_CONTENTS')"
res = gdb_test_python_method(func)
self.assertNoException(res)

func = "Permission.from_process_maps('r--')"
res = gdb_test_python_method(func)
self.assertNoException(res)

func = "Permission.from_monitor_info_mem('-r-')"
res = gdb_test_python_method(func)
self.assertNoException(res)

func = "Permission.from_info_mem('rw')"
res = gdb_test_python_method(func)
self.assertNoException(res)

def test_func_parse_maps(self):
func = "Permission.from_info_sections(' [10] 0x555555574000->0x55555557401b at 0x00020000: .init ALLOC LOAD READONLY CODE HAS_CONTENTS')"
func = "list(GefMemoryManager.parse_procfs_maps())"
res = gdb_test_python_method(func)
self.assertNoException(res)
assert "Section" in res

func = "Permission.from_process_maps('0x0000555555554000 0x0000555555574000 0x0000000000000000 r-- /usr/bin/bash')"
func = "list(GefMemoryManager.parse_gdb_info_sections())"
res = gdb_test_python_method(func)
self.assertNoException(res)
assert "Section" in res

# When in a gef-remote session `parse_gdb_info_sections` should work to
# query the memory maps
port = GDBSERVER_DEFAULT_PORT + 1
before = [f"gef-remote {GDBSERVER_DEFAULT_HOST} {port}"]
with gdbserver_session(port=port) as _:
func = "list(GefMemoryManager.parse_gdb_info_sections())"
res = gdb_test_python_method(func)
self.assertNoException(res)
assert "Section" in res

# When in a gef-remote qemu-user session `parse_gdb_info_sections`
# should work to query the memory maps
port = GDBSERVER_DEFAULT_PORT + 2
target = _target("default")
before = [
f"gef-remote --qemu-user --qemu-binary {target} {GDBSERVER_DEFAULT_HOST} {port}"]
with qemuuser_session(port=port) as _:
func = "list(GefMemoryManager.parse_gdb_info_sections())"
res = gdb_test_python_method(func)
self.assertNoException(res)
assert "Section" in res

func = "Permission.from_info_mem('ffffff2a65e0b000-ffffff2a65e0c000 0000000000001000 -r-')"
# Running the _parse_maps method should just find the correct one
func = "list(GefMemoryManager._parse_maps())"
res = gdb_test_python_method(func)
self.assertNoException(res)
assert "Section" in res

# The parse maps function should automatically get called when we start
# up, and we should be able to view the maps via the `gef.memory.maps`
# property.
func = "gef.memory.maps"
res = gdb_test_python_method(func)
self.assertNoException(res)
assert "Section" in res


@pytest.mark.slow
@pytest.mark.online
Expand Down
30 changes: 17 additions & 13 deletions tests/commands/gef_remote.py
Expand Up @@ -3,45 +3,49 @@
"""


from tests.utils import (GefUnitTestGeneric, _target, gdb_run_cmd,
gdbserver_session, qemuuser_session)

GDBSERVER_PREFERED_HOST = "localhost"
GDBSERVER_PREFERED_PORT = 1234
from tests.utils import (
GefUnitTestGeneric,
_target,
gdb_run_cmd,
gdbserver_session,
qemuuser_session,
GDBSERVER_DEFAULT_HOST,
GDBSERVER_DEFAULT_PORT,
)

class GefRemoteCommand(GefUnitTestGeneric):
"""`gef_remote` command test module"""


def test_cmd_gef_remote(self):
port = GDBSERVER_PREFERED_PORT + 1
before = [f"gef-remote {GDBSERVER_PREFERED_HOST} {port}"]
port = GDBSERVER_DEFAULT_PORT + 1
before = [f"gef-remote {GDBSERVER_DEFAULT_HOST} {port}"]
with gdbserver_session(port=port) as _:
res = gdb_run_cmd(
"pi print(gef.session.remote)", before=before)
self.assertNoException(res)
self.assertIn(
f"RemoteSession(target='{GDBSERVER_PREFERED_HOST}:{port}', local='/tmp/", res)
f"RemoteSession(target='{GDBSERVER_DEFAULT_HOST}:{port}', local='/tmp/", res)
self.assertIn(", qemu_user=False)", res)


def test_cmd_gef_remote_qemu_user(self):
port = GDBSERVER_PREFERED_PORT + 2
port = GDBSERVER_DEFAULT_PORT + 2
target = _target("default")
before = [
f"gef-remote --qemu-user --qemu-binary {target} {GDBSERVER_PREFERED_HOST} {port}"]
f"gef-remote --qemu-user --qemu-binary {target} {GDBSERVER_DEFAULT_HOST} {port}"]
with qemuuser_session(port=port) as _:
res = gdb_run_cmd(
"pi print(gef.session.remote)", before=before)
self.assertNoException(res)
self.assertIn(
f"RemoteSession(target='{GDBSERVER_PREFERED_HOST}:{port}', local='/tmp/", res)
f"RemoteSession(target='{GDBSERVER_DEFAULT_HOST}:{port}', local='/tmp/", res)
self.assertIn(", qemu_user=True)", res)


def test_cmd_target_remote(self):
port = GDBSERVER_PREFERED_PORT + 3
before = [f"target remote {GDBSERVER_PREFERED_HOST}:{port}"]
port = GDBSERVER_DEFAULT_PORT + 3
before = [f"target remote {GDBSERVER_DEFAULT_HOST}:{port}"]
with gdbserver_session(port=port) as _:
res = gdb_run_cmd(
"pi print(gef.session.remote)", before=before)
Expand Down

0 comments on commit 4f20983

Please sign in to comment.