Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix searching when connected to qemu-system instance #906

Merged
merged 7 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 57 additions & 13 deletions gef.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,23 +627,32 @@ def __str__(self) -> str:
perm_str += "x" if self & Permission.EXECUTE else "-"
return perm_str

@staticmethod
def from_info_sections(*args: str) -> "Permission":
perm = Permission(0)
@classmethod
def from_info_sections(cls, *args: str) -> "Permission":
perm = cls(0)
for arg in args:
if "READONLY" in arg: perm |= Permission.READ
if "DATA" in arg: perm |= Permission.WRITE
if "CODE" in arg: perm |= Permission.EXECUTE
return perm

@staticmethod
def from_process_maps(perm_str: str) -> "Permission":
perm = Permission(0)
@classmethod
def from_process_maps(cls, perm_str: str) -> "Permission":
perm = cls(0)
if perm_str[0] == "r": perm |= Permission.READ
if perm_str[1] == "w": perm |= Permission.WRITE
if perm_str[2] == "x": perm |= Permission.EXECUTE
return perm

@classmethod
def from_info_mem(cls, perm_str: str) -> "Permission":
perm = cls(0)
# perm_str[0] shows if this is a user page, which
# we don't track
if perm_str[1] == "r": perm |= Permission.READ
if perm_str[2] == "w": perm |= Permission.WRITE
return perm


class Section:
"""GEF representation of process memory sections."""
Expand Down Expand Up @@ -3343,24 +3352,24 @@ def get_os() -> str:
def is_qemu() -> bool:
if not is_remote_debug():
return False
response = gdb.execute('maintenance packet Qqemu.sstepbits', to_string=True, from_tty=False)
return 'ENABLE=' in response
response = gdb.execute("maintenance packet Qqemu.sstepbits", to_string=True, from_tty=False)
return "ENABLE=" in response


@lru_cache()
def is_qemu_usermode() -> bool:
if not is_qemu():
return False
response = gdb.execute('maintenance packet QOffsets', to_string=True, from_tty=False)
response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False)
return "Text=" in response


@lru_cache()
def is_qemu_system() -> bool:
if not is_qemu():
return False
response = gdb.execute('maintenance packet QOffsets', to_string=True, from_tty=False)
clubby789 marked this conversation as resolved.
Show resolved Hide resolved
return 'received: ""' in response
response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False)
return "received: \"\"" in response


def get_filepath() -> Optional[str]:
Expand Down Expand Up @@ -3535,6 +3544,7 @@ def exit_handler(_: "gdb.ExitedEvent") -> None:
gef.session.remote.close()
del gef.session.remote
gef.session.remote = None
gef.session.remote_initializing = False
return


Expand Down Expand Up @@ -3748,7 +3758,7 @@ def is_in_x86_kernel(address: int) -> bool:

def is_remote_debug() -> bool:
""""Return True is the current debugging session is running through GDB remote session."""
return gef.session.remote is not None
return gef.session.remote_initializing or gef.session.remote is not None


def de_bruijn(alphabet: bytes, n: int) -> Generator[str, None, None]:
Expand Down Expand Up @@ -5948,7 +5958,12 @@ def do_invoke(self, _: List[str], **kwargs: Any) -> None:
return

# try to establish the remote session, throw on error
# Set `.remote_initializing` to True here - `GefRemoteSessionManager` invokes code which
# calls `is_remote_debug` which checks if `remote_initializing` is True or `.remote` is None
# This prevents some spurious errors being thrown during startup
gef.session.remote_initializing = True
gef.session.remote = GefRemoteSessionManager(args.host, args.port, args.pid, qemu_binary)
gef.session.remote_initializing = False
reset_all_caches()
gdb.execute("context")
return
Expand Down Expand Up @@ -10187,6 +10202,12 @@ def maps(self) -> List[Section]:

def __parse_maps(self) -> List[Section]:
"""Return the mapped memory sections"""
try:
if is_qemu_system():
return list(self.__parse_info_mem())
clubby789 marked this conversation as resolved.
Show resolved Hide resolved
except gdb.error:
# Target may not support this command
pass
try:
return list(self.__parse_procfs_maps())
except FileNotFoundError:
Expand Down Expand Up @@ -10251,6 +10272,27 @@ def __parse_gdb_info_sections(self) -> Generator[Section, None, None]:
continue
return

def __parse_info_mem(self) -> Generator[Section, None, None]:
"""Get the memory mapping from GDB's command `monitor info mem`"""
for line in StringIO(gdb.execute("monitor info mem", to_string=True)):
if not line:
break
try:
ranges, off, perms = line.split()
off = int(off, 16)
start, end = [int(s, 16) for s in ranges.split("-")]
except ValueError as e:
continue

perm = Permission.from_info_mem(perms)
yield Section(
page_start=start,
page_end=end,
offset=off,
permission=perm,
inode="",
)


class GefHeapManager(GefManager):
"""Class managing session heap."""
Expand Down Expand Up @@ -10429,6 +10471,7 @@ class GefSessionManager(GefManager):
def __init__(self) -> None:
self.reset_caches()
self.remote: Optional["GefRemoteSessionManager"] = None
self.remote_initializing: bool = False
self.qemu_mode: bool = False
self.convenience_vars_index: int = 0
self.heap_allocated_chunks: List[Tuple[int, int]] = []
Expand Down Expand Up @@ -10461,7 +10504,8 @@ def __str__(self) -> str:
def auxiliary_vector(self) -> Optional[Dict[str, int]]:
if not is_alive():
return None

if is_qemu_system():
return None
if not self._auxiliary_vector:
auxiliary_vector = {}
auxv_info = gdb.execute("info auxv", to_string=True)
Expand Down
12 changes: 12 additions & 0 deletions tests/api/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ def test_func_parse_address(self):
res = gdb_test_python_method(func)
self.assertException(res)

def test_func_parse_maps(self):
func = "Permission.from_info_sections(' [10] 0x555555574000->0x55555557401b at 0x00020000: .init ALLOC LOAD READONLY CODE HAS_CONTENTS')"
res = gdb_test_python_method(func)
self.assertNoException(res)

func = "Permission.from_process_maps('0x0000555555554000 0x0000555555574000 0x0000000000000000 r-- /usr/bin/bash')"
res = gdb_test_python_method(func)
self.assertNoException(res)

func = "Permission.from_info_mem('ffffff2a65e0b000-ffffff2a65e0c000 0000000000001000 -r-')"
res = gdb_test_python_method(func)
self.assertNoException(res)

@pytest.mark.slow
@pytest.mark.online
Expand Down