Skip to content

Commit

Permalink
Fix heap chunks cmd for multiple heaps per arena (#716)
Browse files Browse the repository at this point in the history
* Fix heap chunks cmd for multiple heaps per arena
* Update heap chunks screenshot in docs
* Cache get_glibc_arena
  • Loading branch information
theguy147 committed Sep 20, 2021
1 parent 1c80677 commit 11a68a2
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 54 deletions.
24 changes: 14 additions & 10 deletions docs/commands/heap.md
Expand Up @@ -11,25 +11,28 @@ gef➤ heap <sub_commands>

### `heap chunks` command ###

Displays all the chunks from the `heap` section.
Displays all the chunks from the `heap` section of the current arena.

```
gef➤ heap chunks
```

In some cases, the allocation will start immediately from start of the page. If
so, specify the base address of the first chunk as follows:
![heap-chunks](https://i.imgur.com/y90SfKH.png)

To select from which arena to display chunks either use the `heap set-arena`
command or provide the base address of the other arena like this:

```
gef➤ heap chunks [address]
gef➤ heap chunks [arena_address]
```

![heap-chunks](https://i.imgur.com/2Ew2fA6.png)
![heap-chunks-arena](https://i.imgur.com/y1fybRx.png)

Because usually the heap chunks are aligned to a certain number of bytes in
memory GEF automatically re-aligns the chunks data start addresses to match
Glibc's behavior. To be able to view unaligned chunks as well, you can
disable this with the `--allow-unaligned` flag.
Glibc's behavior. To be able to view unaligned chunks as well, you can disable
this with the `--allow-unaligned` flag. Note that this might result in
incorrect output.

### `heap chunk` command ###

Expand All @@ -41,12 +44,13 @@ information related to a specific chunk:
gef➤ heap chunk [address]
```

![heap-chunk](https://i.imgur.com/SAWNptW.png)
![heap-chunk](https://i.imgur.com/WXpHR58.png)

Because usually the heap chunks are aligned to a certain number of bytes in
memory GEF automatically re-aligns the chunks data start addresses to match
Glibc's behavior. To be able to view unaligned chunks as well, you can
disable this with the `--allow-unaligned` flag.
Glibc's behavior. To be able to view unaligned chunks as well, you can disable
this with the `--allow-unaligned` flag. Note that this might result in
incorrect output.

### `heap arenas` command ###

Expand Down
183 changes: 140 additions & 43 deletions gef.py
Expand Up @@ -802,6 +802,8 @@ def __init__(self, addr):
try:
self.__addr = to_unsigned_long(gdb.parse_and_eval("&{}".format(addr)))
except gdb.error:
warn("Could not parse address '&{}' when searching malloc_state struct, "
"using '&main_arena' instead".format(addr))
self.__addr = search_for_main_arena()

self.num_fastbins = 10
Expand Down Expand Up @@ -906,6 +908,62 @@ def __getitem__(self, item):
return getattr(self, item)


class GlibcHeapInfo:
"""Glibc heap_info struct
See https://github.com/bminor/glibc/blob/glibc-2.34/malloc/arena.c#L64"""

def __init__(self, addr):
self.__addr = addr if type(addr) is int else parse_address(addr)
self.size_t = cached_lookup_type("size_t")
if not self.size_t:
ptr_type = "unsigned long" if current_arch.ptrsize == 8 else "unsigned int"
self.size_t = cached_lookup_type(ptr_type)

@property
def addr(self):
return self.__addr

@property
def ar_ptr_addr(self):
return self.addr

@property
def prev_addr(self):
return self.ar_ptr_addr + current_arch.ptrsize

@property
def size_addr(self):
return self.prev_addr + current_arch.ptrsize

@property
def mprotect_size_addr(self):
return self.size_addr + self.size_t.sizeof

@property
def ar_ptr(self):
return self._get_size_t_pointer(self.ar_ptr_addr)

@property
def prev(self):
return self._get_size_t_pointer(self.prev_addr)

@property
def size(self):
return self._get_size_t(self.size_addr)

@property
def mprotect_size(self):
return self._get_size_t(self.mprotect_size_addr)

# helper methods
def _get_size_t_pointer(self, addr):
size_t_pointer = self.size_t.pointer()
return dereference(addr).cast(size_t_pointer)

def _get_size_t(self, addr):
return dereference(addr).cast(self.size_t)


class GlibcArena:
"""Glibc arena class
Ref: https://github.com/sploitfun/lsploits/blob/master/glibc/malloc/malloc.c#L1671"""
Expand Down Expand Up @@ -960,17 +1018,43 @@ def get_next(self):
return None
return GlibcArena("*{:#x} ".format(addr_next))

def heap_addr(self):
main_arena_addr = to_unsigned_long(gdb.parse_and_eval("&main_arena"))
if int(self) == main_arena_addr:
def is_main_arena(self):
return int(self) == parse_address("&main_arena")

def heap_addr(self, allow_unaligned=False):
if self.is_main_arena():
heap_section = HeapBaseFunction.heap_base()
if not heap_section:
err("Heap not initialized")
return None
return heap_section
_addr = int(self) + self.struct_size
if allow_unaligned:
return _addr
return malloc_align_address(_addr)

def get_heap_info_list(self):
if self.is_main_arena():
return None
heap_addr = self.get_heap_for_ptr(self.top)
heap_infos = [GlibcHeapInfo(heap_addr)]
while heap_infos[-1].prev != 0:
prev = int(heap_infos[-1].prev)
heap_info = GlibcHeapInfo(prev)
heap_infos.append(heap_info)
return heap_infos[::-1]

@staticmethod
def get_heap_for_ptr(ptr):
"""Find the corresponding heap for a given pointer (int).
See https://github.com/bminor/glibc/blob/glibc-2.34/malloc/arena.c#L129"""
if is_32bit():
default_mmap_threshold_max = 512 * 1024
else: # 64bit
default_mmap_threshold_max = 4 * 1024 * 1024 * cached_lookup_type("long").sizeof
heap_max_size = 2 * default_mmap_threshold_max
return ptr & ~(heap_max_size - 1)

def __str__(self):
fmt = "Arena (base={:#x}, top={:#x}, last_remainder={:#x}, next={:#x}, next_free={:#x}, system_mem={:#x})"
return fmt.format(self.__addr, self.top, self.last_remainder, self.n, self.nfree, self.sysmem)
Expand Down Expand Up @@ -1016,9 +1100,12 @@ def usable_size(self):
def get_prev_chunk_size(self):
return read_int_from_memory(self.prev_size_addr)

def get_next_chunk(self):
addr = self.data_address + self.get_chunk_size()
return GlibcChunk(addr)
def get_next_chunk(self, allow_unaligned=False):
addr = self.get_next_chunk_addr()
return GlibcChunk(addr, allow_unaligned=allow_unaligned)

def get_next_chunk_addr(self):
return self.data_address + self.get_chunk_size()

# if free-ed functions
def get_fwd_ptr(self, sll):
Expand Down Expand Up @@ -1166,15 +1253,13 @@ def get_libc_version():
return 0, 0


def get_main_arena():
@lru_cache()
def get_glibc_arena(addr=None):
try:
return GlibcArena(__gef_current_arena__)
addr = "*{}".format(addr) if addr else __gef_current_arena__
return GlibcArena(addr)
except Exception as e:
err(
"Failed to get the main arena, heap commands may not work properly: {}".format(
e
)
)
err("Failed to get the glibc arena, heap commands may not work properly: {}".format(e))
return None


Expand Down Expand Up @@ -6874,7 +6959,7 @@ def do_invoke(self, *args, **kwargs):
self.usage()
return

if get_main_arena() is None:
if get_glibc_arena() is None:
return

addr = to_unsigned_long(gdb.parse_and_eval(args.address))
Expand All @@ -6885,61 +6970,73 @@ def do_invoke(self, *args, **kwargs):

@register_command
class GlibcHeapChunksCommand(GenericCommand):
"""Display information all chunks from main_arena heap. If a location is
passed, it must correspond to the base address of the first chunk."""
"""Display all heap chunks for the current arena. As an optional argument
the base address of a different arena can be passed"""

_cmdline_ = "heap chunks"
_syntax_ = "{0} [-h] [--allow-unaligned] [address]".format(_cmdline_)
_syntax_ = "{0} [-h] [--allow-unaligned] [arena_address]".format(_cmdline_)
_example_ = "\n{0}\n{0} 0x555555775000".format(_cmdline_)

def __init__(self):
super().__init__(complete=gdb.COMPLETE_LOCATION)
self.add_setting("peek_nb_byte", 16, "Hexdump N first byte(s) inside the chunk data (0 to disable)")
return

@parse_arguments({"address": ""}, {"--allow-unaligned": True})
@parse_arguments({"arena_address": ""}, {"--allow-unaligned": True})
@only_if_gdb_running
def do_invoke(self, *args, **kwargs):
args = kwargs["arguments"]

arena = get_main_arena()
arena = get_glibc_arena(addr=args.arena_address)
if arena is None:
err("No valid arena")
return
self.dump_chunks_arena(arena, allow_unaligned=args.allow_unaligned)

if not args.address:
heap_addr = arena.heap_addr()
if heap_addr is None:
return
def dump_chunks_arena(self, arena, print_arena=False, allow_unaligned=False):
top_chunk_addr = arena.top
heap_addr = arena.heap_addr(allow_unaligned=allow_unaligned)
if heap_addr is None:
err("Could not find heap for arena")
return
if print_arena:
gef_print(str(arena))
if arena.is_main_arena():
self.dump_chunks_heap(heap_addr, top=top_chunk_addr, allow_unaligned=allow_unaligned)
else:
heap_addr = parse_address(args.address)

heap_info_structs = arena.get_heap_info_list()
first_heap_info = heap_info_structs.pop(0)
heap_info_t_size = int(arena) - first_heap_info.addr
until = first_heap_info.addr + first_heap_info.size
self.dump_chunks_heap(heap_addr, until=until, top=top_chunk_addr, allow_unaligned=allow_unaligned)
for heap_info in heap_info_structs:
start = heap_info.addr + heap_info_t_size
until = heap_info.addr + heap_info.size
self.dump_chunks_heap(start, until=until, top=top_chunk_addr, allow_unaligned=allow_unaligned)
return

def dump_chunks_heap(self, start, until=None, top=None, allow_unaligned=False):
nb = self.get_setting("peek_nb_byte")
current_chunk = GlibcChunk(heap_addr, from_base=True, allow_unaligned=args.allow_unaligned)
current_chunk = GlibcChunk(start, from_base=True, allow_unaligned=allow_unaligned)
while True:
if current_chunk.base_address == arena.top:
if current_chunk.base_address == top:
gef_print("{} {} {}".format(str(current_chunk), LEFT_ARROW, Color.greenify("top chunk")))
break

if current_chunk.base_address > arena.top:
break

if current_chunk.size == 0:
# EOF
break

line = str(current_chunk)
if nb:
line += "\n [" + hexdump(read_memory(current_chunk.data_address, nb), nb, base=current_chunk.data_address) + "]"
line += "\n [{}]".format(hexdump(read_memory(current_chunk.data_address, nb), nb, base=current_chunk.data_address))
gef_print(line)

next_chunk = current_chunk.get_next_chunk()
if next_chunk is None:
next_chunk_addr = current_chunk.get_next_chunk_addr()
if until and next_chunk_addr >= until:
break
if not Address(value=next_chunk_addr).valid:
break

next_chunk_addr = Address(value=next_chunk.data_address)
if not next_chunk_addr.valid:
# corrupted
next_chunk = current_chunk.get_next_chunk()
if next_chunk is None:
break

current_chunk = next_chunk
Expand Down Expand Up @@ -7178,7 +7275,7 @@ def fastbin_index(sz):
MAX_FAST_SIZE = 80 * SIZE_SZ // 4
NFASTBINS = fastbin_index(MAX_FAST_SIZE) - 1

arena = GlibcArena("*{:s}".format(argv[0])) if len(argv) == 1 else get_main_arena()
arena = GlibcArena("*{:s}".format(argv[0])) if len(argv) == 1 else get_glibc_arena()

if arena is None:
err("Invalid Glibc arena")
Expand Down Expand Up @@ -7231,7 +7328,7 @@ def __init__(self):

@only_if_gdb_running
def do_invoke(self, argv):
if get_main_arena() is None:
if get_glibc_arena() is None:
err("Invalid Glibc arena")
return

Expand All @@ -7255,7 +7352,7 @@ def __init__(self):

@only_if_gdb_running
def do_invoke(self, argv):
if get_main_arena() is None:
if get_glibc_arena() is None:
err("Invalid Glibc arena")
return

Expand Down Expand Up @@ -7284,7 +7381,7 @@ def __init__(self):

@only_if_gdb_running
def do_invoke(self, argv):
if get_main_arena() is None:
if get_glibc_arena() is None:
err("Invalid Glibc arena")
return

Expand Down
10 changes: 9 additions & 1 deletion tests/runtests.py
Expand Up @@ -222,6 +222,14 @@ def test_cmd_heap_chunks(self):
self.assertNoException(res)
self.assertIn("Chunk(addr=", res)
self.assertIn("top chunk", res)

cmd = "python gdb.execute('heap chunks {}'.format(get_glibc_arena().next))"
target = "/tmp/heap-non-main.out"
res = gdb_run_silent_cmd(cmd, target=target)
self.assertNoException(res)
self.assertNotIn("using '&main_arena' instead", res)
self.assertIn("Chunk(addr=", res)
self.assertIn("top chunk", res)
return

def test_cmd_heap_bins_fast(self):
Expand All @@ -236,7 +244,7 @@ def test_cmd_heap_bins_fast(self):
return

def test_cmd_heap_bins_non_main(self):
cmd = "python gdb.execute('heap bins fast {}'.format(get_main_arena().next))"
cmd = "python gdb.execute('heap bins fast {}'.format(get_glibc_arena().next))"
before = ["set environment GLIBC_TUNABLES glibc.malloc.tcache_count=0"]
target = "/tmp/heap-non-main.out"
res = gdb_run_silent_cmd(cmd, before=before, target=target)
Expand Down

0 comments on commit 11a68a2

Please sign in to comment.