Skip to content

Commit

Permalink
Dump memory usage when listing arenas, and add summary option (#1024)
Browse files Browse the repository at this point in the history
  • Loading branch information
r12f committed Dec 18, 2023
1 parent 0eb7f5c commit 17c496c
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 18 deletions.
8 changes: 8 additions & 0 deletions docs/commands/heap.md
Expand Up @@ -62,6 +62,14 @@ re-aligns the chunks data start addresses to match Glibc's behavior. To be able
chunks as well, you can disable this with the `--allow-unaligned` flag. Note that this might result
in incorrect output.

To get a higher level overview of the chunks you can use the `--summary` flag too.

```text
gef➤ heap chunks --summary
```

![heap-chunks-summary](https://i.imgur.com/3HTgtwX.png)

### `heap chunk` command

This command gives visual information of a Glibc malloc-ed chunked. Simply provide the address to
Expand Down
93 changes: 75 additions & 18 deletions gef.py
Expand Up @@ -1375,7 +1375,8 @@ def __eq__(self, other: "GlibcArena") -> bool:

def __str__(self) -> str:
properties = f"base={self.__address:#x}, top={self.top:#x}, " \
f"last_remainder={self.last_remainder:#x}, next={self.next:#x}"
f"last_remainder={self.last_remainder:#x}, next={self.next:#x}, " \
f"mem={self.system_mem}, mempeak={self.max_system_mem}"
return (f"{Color.colorify('Arena', 'blue bold underline')}({properties})")

def __repr__(self) -> str:
Expand Down Expand Up @@ -6287,13 +6288,58 @@ def do_invoke(self, _: List[str], **kwargs: Any) -> None:
return


class GlibcHeapChunkSummary:
def __init__(self):
self.count = 0
self.total_bytes = 0

def process_chunk(self, chunk: GlibcChunk) -> None:
self.count += 1
self.total_bytes += chunk.size


class GlibcHeapArenaSummary:
def __init__(self) -> None:
self.size_distribution = {}
self.flag_distribution = {
"PREV_INUSE": GlibcHeapChunkSummary(),
"IS_MMAPPED": GlibcHeapChunkSummary(),
"NON_MAIN_ARENA": GlibcHeapChunkSummary()
}

def process_chunk(self, chunk: GlibcChunk) -> None:
per_size_summary = self.size_distribution.get(chunk.size, None)
if per_size_summary is None:
per_size_summary = GlibcHeapChunkSummary()
self.size_distribution[chunk.size] = per_size_summary
per_size_summary.process_chunk(chunk)

if chunk.has_p_bit():
self.flag_distribution["PREV_INUSE"].process_chunk(chunk)
if chunk.has_m_bit():
self.flag_distribution["IS_MAPPED"].process_chunk(chunk)
if chunk.has_n_bit():
self.flag_distribution["NON_MAIN_ARENA"].process_chunk(chunk)

def print(self) -> None:
gef_print("== Chunk distribution by size ==")
gef_print("{:<10s}\t{:<10s}\t{:s}".format("ChunkBytes", "Count", "TotalBytes"))
for chunk_size, chunk_summary in sorted(self.size_distribution.items(), key=lambda x: x[1].total_bytes, reverse=True):
gef_print("{:<10d}\t{:<10d}\t{:<d}".format(chunk_size, chunk_summary.count, chunk_summary.total_bytes))

gef_print("\n== Chunk distribution by flag ==")
gef_print("{:<15s}\t{:<10s}\t{:s}".format("Flag", "TotalCount", "TotalBytes"))
for chunk_flag, chunk_summary in self.flag_distribution.items():
gef_print("{:<15s}\t{:<10d}\t{:<d}".format(chunk_flag, chunk_summary.count, chunk_summary.total_bytes))


@register
class GlibcHeapChunksCommand(GenericCommand):
"""Display all heap chunks for the current arena. As an optional argument
the base address of a different arena can be passed"""

_cmdline_ = "heap chunks"
_syntax_ = f"{_cmdline_} [-h] [--all] [--allow-unaligned] [arena_address]"
_syntax_ = f"{_cmdline_} [-h] [--all] [--allow-unaligned] [--summary] [arena_address]"
_example_ = (f"\n{_cmdline_}"
f"\n{_cmdline_} 0x555555775000")

Expand All @@ -6302,24 +6348,24 @@ def __init__(self) -> None:
self["peek_nb_byte"] = (16, "Hexdump N first byte(s) inside the chunk data (0 to disable)")
return

@parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True})
@parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True, ("--summary", "-s"): True})
@only_if_gdb_running
def do_invoke(self, _: List[str], **kwargs: Any) -> None:
args = kwargs["arguments"]
if args.all or not args.arena_address:
for arena in gef.heap.arenas:
self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned)
self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned, summary=args.summary)
if not args.all:
return
try:
arena_addr = parse_address(args.arena_address)
arena = GlibcArena(f"*{arena_addr:#x}")
self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned)
self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned, summary=args.summary)
except gdb.error:
err("Invalid arena")
return

def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False) -> None:
def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False, summary: bool = False) -> None:
heap_addr = arena.heap_addr(allow_unaligned=allow_unaligned)
if heap_addr is None:
err("Could not find heap for arena")
Expand All @@ -6328,31 +6374,42 @@ def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_
gef_print(str(arena))
if arena.is_main_arena():
heap_end = arena.top + GlibcChunk(arena.top, from_base=True).size
self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned)
self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned, summary=summary)
else:
heap_info_structs = arena.get_heap_info_list() or []
for heap_info in heap_info_structs:
if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned):
if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned, summary=summary):
break
return

def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False) -> bool:
def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False, summary: bool = False) -> bool:
nb = self["peek_nb_byte"]
chunk_iterator = GlibcChunk(start, from_base=True, allow_unaligned=allow_unaligned)
heap_summary = GlibcHeapArenaSummary()
for chunk in chunk_iterator:
if chunk.base_address == arena.top:
gef_print(
f"{chunk!s} {LEFT_ARROW} {Color.greenify('top chunk')}")
break
heap_corrupted = chunk.base_address > end

if not summary:
if chunk.base_address == arena.top:
gef_print(
f"{chunk!s} {LEFT_ARROW} {Color.greenify('top chunk')}")
break

if chunk.base_address > end:
if heap_corrupted:
err("Corrupted heap, cannot continue.")
return False

line = str(chunk)
if nb:
line += f"\n [{hexdump(gef.memory.read(chunk.data_address, nb), nb, base=chunk.data_address)}]"
gef_print(line)
if summary:
heap_summary.process_chunk(chunk)
else:
line = str(chunk)
if nb:
line += f"\n [{hexdump(gef.memory.read(chunk.data_address, nb), nb, base=chunk.data_address)}]"
gef_print(line)

if summary:
heap_summary.print()

return True


Expand Down
8 changes: 8 additions & 0 deletions tests/commands/heap.py
Expand Up @@ -94,6 +94,14 @@ def test_cmd_heap_chunks_mult_heaps(self):
self.assertIn("Chunk(addr=", res)
self.assertIn("top chunk", res)

def test_cmd_heap_chunks_summary(self):
cmd = "heap chunks --summary"
target = _target("heap")
self.assertFailIfInactiveSession(gdb_run_cmd(cmd, target=target))
res = gdb_run_silent_cmd(cmd, target=target)
self.assertNoException(res)
self.assertIn("== Chunk distribution by size", res)
self.assertIn("== Chunk distribution by flag", res)

def test_cmd_heap_bins_fast(self):
cmd = "heap bins fast"
Expand Down

0 comments on commit 17c496c

Please sign in to comment.