Skip to content

Commit

Permalink
Add count option in heap chunks command to limit the number of chunks…
Browse files Browse the repository at this point in the history
… to process / output. (#1029)

This change add `--count` option in heap chunks command to limit the
number of chunks to process / output.

Currently, since we don't have the count limit, even with the size
filter, we might get too many chunks in the output, especially when
debugging large dumps.

With this change, we can use `--count` option to give us only a few
samples to check what might be happening, e.g. large buffers with
certain special size.

Here is the screenshot that demos the usage (mixed with `--min-size`)L

![image](https://github.com/hugsy/gef/assets/1533278/2825d9fe-6973-43c0-b1db-0b88963b3256)
  • Loading branch information
r12f committed Jan 2, 2024
1 parent a2704c9 commit d4b849e
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 19 deletions.
9 changes: 9 additions & 0 deletions docs/commands/heap.md
Expand Up @@ -100,6 +100,15 @@ gef➤ heap chunks --min-size 16 --max-size 32

The range is inclusive, so the above command will display all chunks with size >=16 and <=32.

If heap chunks command still gives too many chunks, we can use `--count` argument to limit the number
of the chunks in the output:

```text
gef➤ heap chunks --count 1
```

![heap-chunks-size-filter](https://i.imgur.com/EinuDAt.png)

### `heap chunk` command

This command gives visual information of a Glibc malloc-ed chunked. Simply provide the address to
Expand Down
53 changes: 34 additions & 19 deletions gef.py
Expand Up @@ -6358,14 +6358,23 @@ def print(self) -> None:
for chunk_flag, chunk_summary in self.flag_distribution.items():
gef_print("{:<15s}\t{:<10d}\t{:<d}".format(chunk_flag, chunk_summary.count, chunk_summary.total_bytes))

class GlibcHeapWalkContext:
def __init__(self, print_arena: bool = False, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, count: int = -1, resolve_type: bool = False, summary: bool = False) -> None:
self.print_arena = print_arena
self.allow_unaligned = allow_unaligned
self.min_size = min_size
self.max_size = max_size
self.remaining_chunk_count = count
self.summary = summary
self.resolve_type = resolve_type

@register
class GlibcHeapChunksCommand(GenericCommand):
"""Display all heap chunks for the current arena. As an optional argument
the base address of a different arena can be passed"""

_cmdline_ = "heap chunks"
_syntax_ = f"{_cmdline_} [-h] [--all] [--allow-unaligned] [--summary] [--min-size MIN_SIZE] [--max-size MAX_SIZE] [--resolve] [arena_address]"
_syntax_ = f"{_cmdline_} [-h] [--all] [--allow-unaligned] [--summary] [--min-size MIN_SIZE] [--max-size MAX_SIZE] [--count COUNT] [--resolve] [arena_address]"
_example_ = (f"\n{_cmdline_}"
f"\n{_cmdline_} 0x555555775000")

Expand All @@ -6374,49 +6383,50 @@ def __init__(self) -> None:
self["peek_nb_byte"] = (16, "Hexdump N first byte(s) inside the chunk data (0 to disable)")
return

@parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True, "--min-size": 0, "--max-size": 0, ("--summary", "-s"): True, "--resolve": True})
@parse_arguments({"arena_address": ""}, {("--all", "-a"): True, "--allow-unaligned": True, "--min-size": 0, "--max-size": 0, ("--count", "-n"): -1, ("--summary", "-s"): True, "--resolve": True})
@only_if_gdb_running
def do_invoke(self, _: List[str], **kwargs: Any) -> None:
args = kwargs["arguments"]
ctx = GlibcHeapWalkContext(print_arena=args.all, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, count=args.count, resolve_type=args.resolve, summary=args.summary)
if args.all or not args.arena_address:
for arena in gef.heap.arenas:
self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary, resolve_type=args.resolve)
self.dump_chunks_arena(arena, ctx)
if not args.all:
return
try:
arena_addr = parse_address(args.arena_address)
arena = GlibcArena(f"*{arena_addr:#x}")
self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned, min_size=args.min_size, max_size=args.max_size, summary=args.summary, resolve_type=args.resolve)
self.dump_chunks_arena(arena, ctx)
except gdb.error:
err("Invalid arena")
return

def dump_chunks_arena(self, arena: GlibcArena, print_arena: bool = False, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False, resolve_type: bool = False) -> None:
heap_addr = arena.heap_addr(allow_unaligned=allow_unaligned)
def dump_chunks_arena(self, arena: GlibcArena, ctx: GlibcHeapWalkContext) -> None:
heap_addr = arena.heap_addr(allow_unaligned=ctx.allow_unaligned)
if heap_addr is None:
err("Could not find heap for arena")
return
if print_arena:
if ctx.print_arena:
gef_print(str(arena))
if arena.is_main_arena():
heap_end = arena.top + GlibcChunk(arena.top, from_base=True).size
self.dump_chunks_heap(heap_addr, heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary, resolve_type=resolve_type)
self.dump_chunks_heap(heap_addr, heap_end, arena, ctx)
else:
heap_info_structs = arena.get_heap_info_list() or []
for heap_info in heap_info_structs:
if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, allow_unaligned=allow_unaligned, min_size=min_size, max_size=max_size, summary=summary, resolve_type=resolve_type):
if not self.dump_chunks_heap(heap_info.heap_start, heap_info.heap_end, arena, ctx):
break
return

def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unaligned: bool = False, min_size: int = 0, max_size: int = 0, summary: bool = False, resolve_type: bool = False) -> bool:
def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, ctx: GlibcHeapWalkContext) -> bool:
nb = self["peek_nb_byte"]
chunk_iterator = GlibcChunk(start, from_base=True, allow_unaligned=allow_unaligned)
heap_summary = GlibcHeapArenaSummary(resolve_type=resolve_type)
chunk_iterator = GlibcChunk(start, from_base=True, allow_unaligned=ctx.allow_unaligned)
heap_summary = GlibcHeapArenaSummary(resolve_type=ctx.resolve_type)
for chunk in chunk_iterator:
heap_corrupted = chunk.base_address > end
should_process = self.should_process_chunk(chunk, min_size, max_size)
should_process = self.should_process_chunk(chunk, ctx)

if not summary and chunk.base_address == arena.top:
if not ctx.summary and chunk.base_address == arena.top:
if should_process:
gef_print(
f"{chunk!s} {LEFT_ARROW} {Color.greenify('top chunk')}")
Expand All @@ -6429,24 +6439,29 @@ def dump_chunks_heap(self, start: int, end: int, arena: GlibcArena, allow_unalig
if not should_process:
continue

if summary:
if ctx.remaining_chunk_count == 0:
break

if ctx.summary:
heap_summary.process_chunk(chunk)
else:
line = str(chunk)
if nb:
line += f"\n [{hexdump(gef.memory.read(chunk.data_address, nb), nb, base=chunk.data_address)}]"
gef_print(line)

if summary:
ctx.remaining_chunk_count -= 1

if ctx.summary:
heap_summary.print()

return True

def should_process_chunk(self, chunk: GlibcChunk, min_size: int, max_size: int) -> bool:
if chunk.size < min_size:
def should_process_chunk(self, chunk: GlibcChunk, ctx: GlibcHeapWalkContext) -> bool:
if chunk.size < ctx.min_size:
return False

if 0 < max_size < chunk.size:
if 0 < ctx.max_size < chunk.size:
return False

return True
Expand Down
15 changes: 15 additions & 0 deletions tests/commands/heap.py
Expand Up @@ -141,6 +141,21 @@ def test_cmd_heap_chunks_max_size_filter(self):
self.assertNoException(res)
self.assertNotIn("Chunk(addr=", res)

def test_cmd_heap_chunks_with_count(self):
cmd = "heap chunks --count 1"
target = debug_target("heap")
self.assertFailIfInactiveSession(gdb_run_cmd(cmd, target=target))
res = gdb_run_silent_cmd(cmd, target=target)
self.assertNoException(res)
self.assertIn("Chunk(addr=", res)

cmd = "heap chunks --count 0"
target = debug_target("heap")
self.assertFailIfInactiveSession(gdb_run_cmd(cmd, target=target))
res = gdb_run_silent_cmd(cmd, target=target)
self.assertNoException(res)
self.assertNotIn("Chunk(addr=", res)

def test_cmd_heap_bins_fast(self):
cmd = "heap bins fast"
before = ["set environment GLIBC_TUNABLES glibc.malloc.tcache_count=0"]
Expand Down

0 comments on commit d4b849e

Please sign in to comment.