Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to resolve type when dumping heap summary. #1030

Merged
merged 5 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/commands/heap.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ gef➤ heap chunks --summary

![heap-chunks-summary](https://i.imgur.com/3HTgtwX.png)

Sometimes, multiple types of objects could have the same size, hence it might not be enough only
knowing the chunk size when debugging issues like memory leaks. GEF supports using the vtable to
determine the type of the object stored in the chunk. To enable this feature, use `--resolve` along
with the `--summary` flag.

```text
gef➤ heap chunks --summary --resolve
```

![heap-chunks-summary-resolve](https://i.imgur.com/2Mm0JF6.png)

Heap chunk command also supports filtering chunks by their size. To do so, simply provide the
`--min-size` or `--max-size` argument:

Expand Down
48 changes: 30 additions & 18 deletions gef.py
Original file line number Diff line number Diff line change
Expand Up @@ -1710,6 +1710,15 @@ def psprint(self) -> str:
msg.append(f"\n\n{self._str_pointers()}")
return "\n".join(msg) + "\n"

def resolve_type(self) -> str:
ptr_data = gef.memory.read_integer(self.data_address)
if ptr_data != 0:
sym = gdb_get_location_from_symbol(ptr_data)
if sym is not None and "vtable for" in sym[0]:
return sym[0].replace("vtable for ", "")

return ""


class GlibcFastChunk(GlibcChunk):

Expand Down Expand Up @@ -1999,7 +2008,6 @@ def gdb_lookup_symbol(sym: str) -> Optional[Tuple[Optional[str], Optional[Tuple[
except gdb.error:
return None


@lru_cache(maxsize=512)
def gdb_get_location_from_symbol(address: int) -> Optional[Tuple[str, int]]:
"""Retrieve the location of the `address` argument from the symbol table.
Expand Down Expand Up @@ -6302,7 +6310,8 @@ def do_invoke(self, _: List[str], **kwargs: Any) -> None:


class GlibcHeapChunkSummary:
def __init__(self):
def __init__(self, desc = ""):
self.desc = desc
self.count = 0
self.total_bytes = 0

Expand All @@ -6312,7 +6321,8 @@ def process_chunk(self, chunk: GlibcChunk) -> None:


class GlibcHeapArenaSummary:
def __init__(self) -> None:
def __init__(self, resolve_type = False) -> None:
self.resolve_symbol = resolve_type
self.size_distribution = {}
self.flag_distribution = {
"PREV_INUSE": GlibcHeapChunkSummary(),
Expand All @@ -6321,10 +6331,12 @@ def __init__(self) -> None:
}

def process_chunk(self, chunk: GlibcChunk) -> None:
per_size_summary = self.size_distribution.get(chunk.size, None)
chunk_type = "" if not self.resolve_symbol else chunk.resolve_type()

per_size_summary = self.size_distribution.get((chunk.size, chunk_type), None)
if per_size_summary is None:
per_size_summary = GlibcHeapChunkSummary()
self.size_distribution[chunk.size] = per_size_summary
per_size_summary = GlibcHeapChunkSummary(desc=chunk_type)
self.size_distribution[(chunk.size, chunk_type)] = per_size_summary
per_size_summary.process_chunk(chunk)

if chunk.has_p_bit():
Expand All @@ -6336,9 +6348,9 @@ def process_chunk(self, chunk: GlibcChunk) -> None:

def print(self) -> None:
gef_print("== Chunk distribution by size ==")
gef_print("{:<10s}\t{:<10s}\t{:s}".format("ChunkBytes", "Count", "TotalBytes"))
for chunk_size, chunk_summary in sorted(self.size_distribution.items(), key=lambda x: x[1].total_bytes, reverse=True):
gef_print("{:<10d}\t{:<10d}\t{:<d}".format(chunk_size, chunk_summary.count, chunk_summary.total_bytes))
gef_print("{:<10s}\t{:<10s}\t{:15s}\t{:s}".format("ChunkBytes", "Count", "TotalBytes", "Description"))
for chunk_info, chunk_summary in sorted(self.size_distribution.items(), key=lambda x: x[1].total_bytes, reverse=True):
gef_print("{:<10d}\t{:<10d}\t{:<15d}\t{:s}".format(chunk_info[0], chunk_summary.count, chunk_summary.total_bytes, chunk_summary.desc))

gef_print("\n== Chunk distribution by flag ==")
gef_print("{:<15s}\t{:<10s}\t{:s}".format("Flag", "TotalCount", "TotalBytes"))
Expand All @@ -6352,7 +6364,7 @@ class GlibcHeapChunksCommand(GenericCommand):
the base address of a different arena can be passed"""

_cmdline_ = "heap chunks"
_syntax_ = f"{_cmdline_} [-h] [--all] [--allow-unaligned] [--summary] [--min-size MIN_SIZE] [--max-size MAX_SIZE] [arena_address]"
_syntax_ = f"{_cmdline_} [-h] [--all] [--allow-unaligned] [--summary] [--min-size MIN_SIZE] [--max-size MAX_SIZE] [--resolve] [arena_address]"
_example_ = (f"\n{_cmdline_}"
f"\n{_cmdline_} 0x555555775000")

Expand All @@ -6361,24 +6373,24 @@ def __init__(self) -> None:
self["peek_nb_byte"] = (16, "Hexdump N first byte(s) inside the chunk data (0 to disable)")
return

@parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True, "--min-size": 0, "--max-size": 0, ("--summary", "-s"): True})
@parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True, "--min-size": 0, "--max-size": 0, ("--summary", "-s"): True, "--resolve": True})
@only_if_gdb_running
def do_invoke(self, _: List[str], **kwargs: Any) -> None:
args = kwargs["arguments"]
if args.all or not args.arena_address:
for arena in gef.heap.arenas:
self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary)
self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary, resolve_type=args.resolve)
if not args.all:
return
try:
arena_addr = parse_address(args.arena_address)
arena = GlibcArena(f"*{arena_addr:#x}")
self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary)
self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary, resolve_type=args.resolve)
except gdb.error:
err("Invalid arena")
return

def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False) -> None:
def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False, resolve_type: bool = False) -> None:
heap_addr = arena.heap_addr(allow_unaligned=allow_unaligned)
if heap_addr is None:
err("Could not find heap for arena")
Expand All @@ -6387,18 +6399,18 @@ def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_
gef_print(str(arena))
if arena.is_main_arena():
heap_end = arena.top + GlibcChunk(arena.top, from_base=True).size
self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary)
self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary, resolve_type=resolve_type)
else:
heap_info_structs = arena.get_heap_info_list() or []
for heap_info in heap_info_structs:
if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary):
if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary, resolve_type=resolve_type):
break
return

def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False) -> bool:
def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False, resolve_type: bool = False) -> bool:
nb = self["peek_nb_byte"]
chunk_iterator = GlibcChunk(start, from_base=True, allow_unaligned=allow_unaligned)
heap_summary = GlibcHeapArenaSummary()
heap_summary = GlibcHeapArenaSummary(resolve_type=resolve_type)
for chunk in chunk_iterator:
heap_corrupted = chunk.base_address > end
should_process = self.should_process_chunk(chunk, min_size, max_size)
Expand Down
8 changes: 8 additions & 0 deletions tests/commands/heap.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ def test_cmd_heap_chunks_summary(self):
self.assertIn("== Chunk distribution by size", res)
self.assertIn("== Chunk distribution by flag", res)

def test_cmd_heap_chunks_summary_with_type_resolved(self):
cmd = "heap chunks --summary --resolve"
target = _target("class")
res = gdb_run_silent_cmd(cmd, target=target, before=["b B<TraitA, TraitB>::Run()"])
self.assertNoException(res)
self.assertIn("== Chunk distribution by size", res)
self.assertIn("B<TraitA, TraitB>", res)

def test_cmd_heap_chunks_min_size_filter(self):
cmd = "heap chunks --min-size 16"
target = _target("heap")
Expand Down