Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dump memory usage when listing arenas, and add summary option in heap chunks command. #1024

Merged
merged 7 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/commands/heap.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ re-aligns the chunks data start addresses to match Glibc's behavior. To be able
chunks as well, you can disable this with the `--allow-unaligned` flag. Note that this might result
in incorrect output.

To get a higher level overview of the chunks you can use the `--summary` flag too.

```text
gef➤ heap chunks --summary
```

![heap-chunks-summary](https://i.imgur.com/3HTgtwX.png)

### `heap chunk` command

This command gives visual information of a Glibc malloc-ed chunked. Simply provide the address to
Expand Down
93 changes: 75 additions & 18 deletions gef.py
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,8 @@ def __eq__(self, other: "GlibcArena") -> bool:

def __str__(self) -> str:
properties = f"base={self.__address:#x}, top={self.top:#x}, " \
f"last_remainder={self.last_remainder:#x}, next={self.next:#x}"
f"last_remainder={self.last_remainder:#x}, next={self.next:#x}, " \
f"mem={self.system_mem}, mempeak={self.max_system_mem}"
return (f"{Color.colorify('Arena', 'blue bold underline')}({properties})")

def __repr__(self) -> str:
Expand Down Expand Up @@ -6287,13 +6288,58 @@ def do_invoke(self, _: List[str], **kwargs: Any) -> None:
return


class GlibcHeapChunkSummary:
r12f marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self):
self.count = 0
self.total_bytes = 0

def process_chunk(self, chunk: GlibcChunk) -> None:
self.count += 1
self.total_bytes += chunk.size


r12f marked this conversation as resolved.
Show resolved Hide resolved
class GlibcHeapArenaSummary:
def __init__(self) -> None:
self.size_distribution = {}
self.flag_distribution = {
"PREV_INUSE": GlibcHeapChunkSummary(),
"IS_MMAPPED": GlibcHeapChunkSummary(),
"NON_MAIN_ARENA": GlibcHeapChunkSummary()
}

def process_chunk(self, chunk: GlibcChunk) -> None:
per_size_summary = self.size_distribution.get(chunk.size, None)
if per_size_summary is None:
per_size_summary = GlibcHeapChunkSummary()
self.size_distribution[chunk.size] = per_size_summary
per_size_summary.process_chunk(chunk)

if chunk.has_p_bit():
self.flag_distribution["PREV_INUSE"].process_chunk(chunk)
if chunk.has_m_bit():
self.flag_distribution["IS_MAPPED"].process_chunk(chunk)
if chunk.has_n_bit():
self.flag_distribution["NON_MAIN_ARENA"].process_chunk(chunk)

def print(self) -> None:
gef_print("== Chunk distribution by size ==")
gef_print("{:<10s}\t{:<10s}\t{:s}".format("ChunkBytes", "Count", "TotalBytes"))
for chunk_size, chunk_summary in sorted(self.size_distribution.items(), key=lambda x: x[1].total_bytes, reverse=True):
gef_print("{:<10d}\t{:<10d}\t{:<d}".format(chunk_size, chunk_summary.count, chunk_summary.total_bytes))

gef_print("\n== Chunk distribution by flag ==")
gef_print("{:<15s}\t{:<10s}\t{:s}".format("Flag", "TotalCount", "TotalBytes"))
for chunk_flag, chunk_summary in self.flag_distribution.items():
gef_print("{:<15s}\t{:<10d}\t{:<d}".format(chunk_flag, chunk_summary.count, chunk_summary.total_bytes))


@register
class GlibcHeapChunksCommand(GenericCommand):
"""Display all heap chunks for the current arena. As an optional argument
the base address of a different arena can be passed"""

_cmdline_ = "heap chunks"
_syntax_ = f"{_cmdline_} [-h] [--all] [--allow-unaligned] [arena_address]"
_syntax_ = f"{_cmdline_} [-h] [--all] [--allow-unaligned] [--summary] [arena_address]"
_example_ = (f"\n{_cmdline_}"
f"\n{_cmdline_} 0x555555775000")

Expand All @@ -6302,24 +6348,24 @@ def __init__(self) -> None:
self["peek_nb_byte"] = (16, "Hexdump N first byte(s) inside the chunk data (0 to disable)")
return

@parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True})
@parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True, ("--summary", "-s"): True})
@only_if_gdb_running
def do_invoke(self, _: List[str], **kwargs: Any) -> None:
args = kwargs["arguments"]
if args.all or not args.arena_address:
for arena in gef.heap.arenas:
self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned)
self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned, summary=args.summary)
if not args.all:
return
try:
arena_addr = parse_address(args.arena_address)
arena = GlibcArena(f"*{arena_addr:#x}")
self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned)
self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned, summary=args.summary)
except gdb.error:
err("Invalid arena")
return

def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False) -> None:
def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False, summary: bool = False) -> None:
heap_addr = arena.heap_addr(allow_unaligned=allow_unaligned)
if heap_addr is None:
err("Could not find heap for arena")
Expand All @@ -6328,31 +6374,42 @@ def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_
gef_print(str(arena))
if arena.is_main_arena():
heap_end = arena.top + GlibcChunk(arena.top, from_base=True).size
self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned)
self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned, summary=summary)
else:
heap_info_structs = arena.get_heap_info_list() or []
for heap_info in heap_info_structs:
if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned):
if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned, summary=summary):
break
return

def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False) -> bool:
def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False, summary: bool = False) -> bool:
nb = self["peek_nb_byte"]
chunk_iterator = GlibcChunk(start, from_base=True, allow_unaligned=allow_unaligned)
heap_summary = GlibcHeapArenaSummary()
for chunk in chunk_iterator:
if chunk.base_address == arena.top:
gef_print(
f"{chunk!s} {LEFT_ARROW} {Color.greenify('top chunk')}")
break
heap_corrupted = chunk.base_address > end

if not summary:
if chunk.base_address == arena.top:
gef_print(
f"{chunk!s} {LEFT_ARROW} {Color.greenify('top chunk')}")
break

if chunk.base_address > end:
if heap_corrupted:
err("Corrupted heap, cannot continue.")
return False

line = str(chunk)
if nb:
line += f"\n [{hexdump(gef.memory.read(chunk.data_address, nb), nb, base=chunk.data_address)}]"
gef_print(line)
if summary:
heap_summary.process_chunk(chunk)
else:
line = str(chunk)
if nb:
line += f"\n [{hexdump(gef.memory.read(chunk.data_address, nb), nb, base=chunk.data_address)}]"
gef_print(line)

if summary:
heap_summary.print()

return True


Expand Down
8 changes: 8 additions & 0 deletions tests/commands/heap.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ def test_cmd_heap_chunks_mult_heaps(self):
self.assertIn("Chunk(addr=", res)
self.assertIn("top chunk", res)

def test_cmd_heap_chunks_summary(self):
cmd = "heap chunks --summary"
target = _target("heap")
self.assertFailIfInactiveSession(gdb_run_cmd(cmd, target=target))
res = gdb_run_silent_cmd(cmd, target=target)
self.assertNoException(res)
self.assertIn("== Chunk distribution by size", res)
self.assertIn("== Chunk distribution by flag", res)

def test_cmd_heap_bins_fast(self):
cmd = "heap bins fast"
Expand Down