Skip to content
This repository has been archived by the owner. It is now read-only.
Go to file
Cannot retrieve contributors at this time
5538 lines (4358 sloc) 219 KB
# glibc heap analysis classes
# Copyright (c) 2018, Frank Block, ERNW GmbH <>
# All rights reserved.
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * The names of the contributors may not be used to endorse or promote products
# derived from this software without specific prior written permission.
This module implements several classes, allowing the glibc heap analysis for a
given process.
from __future__ import print_function
__author__ = "Frank Block <>"
import re
import pdb
import struct
import traceback
from numbers import Number
from builtins import str
from builtins import hex
from builtins import range
from rekall.plugins.overlays import basic
from rekall.plugins.linux import common
from rekall.plugins.linux import cpuinfo
from import InvalidAddress
from rekall.io_manager import IOManagerError
from rekall.plugins import yarascanner
from rekall.plugins import core
from rekall import addrspace
from rekall import scan
from rekall import obj
from rekall_lib import utils
# is set on HeapAnalysis instantiation
# Probably more versions would work, especially when the corresponding vtype
# information are provided, but those are the versions we tested against
_SUPPORTED_GLIBC_VERSIONS = ['2.27', '2.26', '2.25', '2.24', '2.23', '2.22', '2.21', '2.20']
_LIBC_REGEX = '(?:^|/)libc[^a-zA-Z][^/]*\\.so'
def get_vma_for_offset(vmas, offset):
"""Returns a list with identifier and vm_area that given offset belongs to.
Expects the output from _get_vmas_for_task as argument.
vma = vmas.get_containing_range(offset)
return None if vma[0] == None else vma
def get_vma_name_for_regex(vmas, regex):
"""Searches all vma names for the given regex and returns the first hit."""
if not vmas or not regex:
return None
for _, _, vm_data in vmas:
if, vm_data['name'], re.IGNORECASE):
return vm_data['name']
def get_libc_filename(vmas):
"""Returns the libc file name from the vma, where the _LIBC_REGEX matches.
return get_vma_name_for_regex(vmas, _LIBC_REGEX)
def get_libc_range(vmas):
"""Returns the lowest and highest address for the libc vma. See also
return get_mem_range_for_regex(vmas, _LIBC_REGEX)
def get_mem_range_for_regex(vmas, regex):
"""Returns the lowest and highest address of memory areas belonging to the
vm_areas, the given regex matches on. The result is given as a list, where
the lowest address is the first element. Expects the output from
_get_vmas_for_task as argument."""
offsets = None
if not vmas:
return None
for vm_start, vm_end, vm_data in vmas:
if, vm_data['name'], re.IGNORECASE):
if not offsets:
offsets = [vm_start]
if vm_start < offsets[0]:
offsets[0] = vm_start
if vm_end > offsets[1]:
offsets[1] = vm_end
return offsets
class HeapAnalysis(common.LinProcessFilter):
"""Basic abstract class for linux heap analysis.
Mostly serves the main_arena.
__abstract = True
_main_heap_identifier = "[heap]"
# used to mark vm_areas residing between the [heap] and the first file or
# stack object and all other vm_areas that have no file object for
# (mmapped regions can also reside somewhere beyond the typical heap/stack
# area) note that vmas with this identifier might be empty, old thread
# stacks or vm_areas belonging to e.g. mapped files.
# A pretty reliable source for heap vm_areas is the self.heap_vmas list as
# it contains vm_areas which are identified to most probably belong to a
# heap or mmapped region.
_heap_vma_identifier = "[heap-vma]"
_pot_mmapped_vma_identifier = "[pot-mmapped-vma]"
# is normally only automatically set when using a dummy arena or the chunk
# dumper, as in those cases all chunks are walked at least two times
def activate_chunk_preservation(self):
"""Sets _preserve_chunks to True. This forces all allocated chunk
functions to store chunks in lists, which highly increases the speed
of a second walk over those chunks. This feature can be disabled
with the cmd line option disable_chunk_preservation."""
if not self._preserve_chunks and \
not self.plugin_args.disable_chunk_preservation:
"Chunk preservation has been activated. "
"This might consume large amounts of memory "
"depending on the chunk count. If you are low on free memory "
"space (RAM), you might want to deactivate this feature by "
"using the cmd line option disable_chunk_preservation. The "
"only downside is a longer plugin runtime.")
self._preserve_chunks = True
def _get_saved_stack_frame_pointers(self, task):
"""Returns a list of dicts, containing the ebp,esp and pid values
for each thread."""
if not
return None
# To gather thread stacks, we examine the pt_regs struct for each
# thread and extract the saved stack frame pointers
thread_stack_offsets = []
thread_group_offset = self.profile.get_obj_offset("task_struct",
for thread_group in task.thread_group.walk_list("prev"):
thread_task = self.profile.task_struct(
offset=thread_group.obj_offset - thread_group_offset,
pt_regs = self._get_task_pt_regs(thread_task)
return thread_stack_offsets
def _get_pt_regs_offset(self):
"""Used for _get_task_pt_regs"""
# taken from arch/x86/include/asm/page_64_types.h
if self.session.profile.get_kernel_config('CONFIG_X86_32'):
if self.session.profile.get_kernel_config('CONFIG_VM86'):
if self.session.profile.get_kernel_config("CONFIG_KASAN"):
if self.session.profile.metadata("arch") == 'AMD64':
THREAD_SIZE = self._min_pagesize << THREAD_SIZE_ORDER
def _get_task_pt_regs(self, thread_task):
# Since sp0 is not accessible anymore on x86_64, we use this method
# See
# and task_pt_regs in arch/x86/include/asm/processor.h
pt_regs_offset = thread_task.stack.v() + self._pt_regs_offset
pt_regs_offset -= self.profile.get_obj_size("pt_regs")
return self.profile.pt_regs(offset=pt_regs_offset, vm=self.process_as)
# Basically the code from the proc_maps plugin but with thread specific
# enhancements
def _get_vmas_for_task(self, task):
"""Returns a list of lists, containing ["name", vm_area] pairs. """
if not
return None
result = utils.RangedCollection()
thread_stack_offsets = self._get_saved_stack_frame_pointers(task)
if not thread_stack_offsets:
thread_stack_offsets = list()
# The first pair contains the "main" thread and the mm start_stack
# value is more reliable for identifying the relevant memory region
# than the saved frame pointers
thread_stack_offsets[0]['start_stack'] =
heap_area = False
for vma in"vm_next"):
data = dict()
data['vm_flags'] = ''.join(vma.vm_flags)
if vma.vm_file.dereference():
data['is_file'] = True
fname = task.get_path(vma.vm_file)
if heap_area:
heap_area = False
data['is_file'] = False
fname = ""
if heap_area:
fname = self._heap_vma_identifier
fname = self._pot_mmapped_vma_identifier
# main heap can have 3 or more vm_area_struct structs
if vma.vm_start <= <= vma.vm_end or \
( <= vma.vm_start
< vma.vm_end <= or \
vma.vm_start <= <= vma.vm_end:
fname = self._main_heap_identifier
heap_area = True
for offsets in thread_stack_offsets:
if (('start_stack' in list(offsets.keys()) and
vma.vm_start <= offsets['start_stack']
<= vma.vm_end) or
vma.vm_start <= offsets['esp'] <= vma.vm_end):
fname = "[stack"
pid = offsets['pid']
fname += "]" if == pid else \
data['ebp'] = offsets['ebp']
data['esp'] = offsets['esp']
heap_area = False
data['name'] = fname
result.insert(vma.vm_start, vma.vm_end, data)
return result
def _load_libc_profile(self):
"""Loads the Libc profile for the current libc version."""
libc_version_string = None
libc_profile = None
if self.plugin_args.glibc_version:
libc_version_string = str(self.plugin_args.glibc_version)
# we try to gather version information from the mapped libc lib
major_version = None
minor_version = None
match = None
libc_filename = get_libc_filename(self.vmas)
if libc_filename:
match ='(\d+)\.(\d+)', libc_filename)
# Fallback
libc_version_string = '224'
if match and len(match.groups()) == 2:
major_version = int(
minor_version = int(
libc_version_string = str(major_version) + str(minor_version)"Trying to load profile for version {:s}"
" from the repository."
# TODO: dynamic selection of distribution specific profiles
dist = 'base'
libc_profile = None
libc_profile = self.session.LoadProfile(
except IOManagerError as ex:
# gets e.g. thrown when a specified repository folder is not
# existent
"Error while trying to load a repository: {:s}"
if not libc_profile:
# fallback: there seems to be no profile from the repository,
# so we try to load a profile internally
"Repository failed: Now using internal profiles.")
if not self.plugin_args.glibc_version and major_version and \
if major_version == 2:
if minor_version >= 26:
libc_version_string = '226'
elif minor_version == 24 or minor_version == 25:
libc_version_string = '224'
elif minor_version == 23:
libc_version_string = '223'
libc_version_string = '220'
"Unsupported glibc major version number.")
"Loading internal profile for version {:s}."
if self.session.profile.metadata("arch") == 'I386':
libc_profile = GlibcProfile32(version=libc_version_string,
elif self.session.profile.metadata("arch") == 'AMD64':
libc_profile = GlibcProfile64(version=libc_version_string,
if not libc_profile:
self.session.logging.error('Unable to load a libc profile.')
self._libc_version = libc_version_string
if all(x in list(self.profile.vtypes.keys()) for x in
['malloc_chunk', 'malloc_state',
'malloc_par', '_heap_info']):
self._libc_profile_success = True
self.session.logging.error('Error while loading libc profile.')
def _check_and_report_chunksize(
self, chunk, current_border, mmapped=False):
"""Checks whether or not the current chunk
- is bigger than the given border
- smaller than the minimum size a chunk is allowed to be
- 's address is aligned.
if chunk.v() + chunk.chunksize() > current_border:
"Chunk at offset 0x{:x} has a size larger than the current "
"memory region. This shouldn't be the case."
return False
if chunk.chunksize() < self._minsize:
if not self._check_and_report_chunk_for_being_swapped(chunk):
"Chunk at offset 0x{:x} has a size smaller than MINSIZE, "
"which shouldn't be the case and indicates a problem."
return False
# If the MALLOC_ALIGNMENT has been changed via compile time options,
# mmapped chunks are shifted a few bytes forward to be aligned, while
# the size of the gap is saved in the prev_size field (see also
# explanation for self._front_misalign_correction).
# This gap must be included in the calculation for aligned_ok
chunksize = chunk.chunksize() + chunk.get_prev_size() if mmapped else \
if not self._aligned_ok(chunksize):
"The size of chunk at offset 0x{:x} is not a multiple of "
"MALLOC_ALIGNMENT, which shouldn't be the case and indicates "
"a problem.".format(chunk.v()))
return False
return True
def _check_and_report_chunk_for_being_swapped(self, chunk):
"""Tests the size field of a given chunk for being swapped."""
if not chunk:
# as we don't have any object to analyze further, we just return
# true in this case
return True
elif chunk.get_size() == 0:
# since the name for the size field for malloc_chunk changed in the
# past, we don't gather the offset for the size field via
# get_obj_offset, but use obj_size
size_offset = chunk.prev_size.obj_size
if self._check_address_for_being_swapped(chunk.v() + size_offset):
"The memory page(s) belonging to the "
"chunk at offset 0x{:x} have been swapped. This will "
"lead to incorrect/incomplete results and more "
return True
"The memory page(s) belonging to the chunk at offset "
"0x{:x} don't seem to have been swapped, but the chunk "
"has a size of 0, which shouldn't be the case and "
"will lead to incorrect/incomplete results and more "
return False
def _check_address_for_being_swapped(self, address):
"""Retrieves the PTE entry information for the given address and tests
whether the page is swapped."""
for translation_descriptor in self.process_as.describe_vtop(address):
if isinstance(translation_descriptor, InvalidAddress):
return True
return False
def _check_and_report_allocated_chunk(
self, arena, chunk, next_chunk, current_border):
"""Checks if the given chunk should be in use (depending on the
PREV_INUSE bit of the next chunk), has a size > MINSIZE, is aligned,
whether or not it is part of any bin or fastbin in conjunction with
next_chunks PREV_INUSE bit and if next_chunks prev_size field has
same value as current chunk's size. This function is not intended to
be used for the "bottom chunks". It returns True if no error occurs
and if the given chunk is not part of bins or fastbins.
error_base_string = (
"Found a presumably {0} chunk at offset 0x{1:x} which is however "
"{2}part of the bins. This is unexpected and might either "
"indicate an error or possibly in seldom cases be the result "
"from a race condition.")
if not self._check_and_report_chunksize(chunk, current_border):
return False
if not self._aligned_ok(self._chunk2mem(chunk)):
"Chunk at offset 0x{:x} is not aligned. As chunks are normally"
" always aligned, this indicates a mistakenly chosen chunk and"
" probably results in wrong results.".format(chunk.v()))
return False
# current chunk is tested in _check_and_report_chunksize for
# being swapped
if next_chunk.prev_inuse():
# for chunks in fastbins, the prev_inuse bit is not unset,
# so we don't check that here
if chunk in arena.freed_chunks:
# freed chunks shouldn't be marked as in use
error_base_string.format("allocated", chunk.v(), "")
elif chunk not in \
(arena.freed_fast_chunks + arena.freed_tcache_chunks):
return True
# current chunk seems to be freed, hence its size should equal
# next chunk's prev_size
if chunk.chunksize() != next_chunk.get_prev_size():
"Chunk at offset 0x{:x} seems to be freed but its size "
"doesn't match the next chunk's prev_size value."
elif chunk in \
(arena.freed_fast_chunks + arena.freed_tcache_chunks):
# fastbins/tcache chunks normally have the prev_inuse bit set
"Unexpected: Found fastbin-chunk at offset 0x{0:x} which "
"prev_inuse bit is unset. This shouldn't normally be the "
elif chunk not in arena.freed_chunks:
# chunk is not marked as in use, but neither part of any bin
# or fastbin
error_base_string.format("freed", chunk.v(), "not ")
return False
def _allocated_chunks_for_mmapped_chunk(self, mmap_first_chunk):
"""Returns all allocated chunks for the mmap region the given chunk
belongs to."""
if not mmap_first_chunk:
"_allocated_chunks_for_mmapped_chunk has been called with "
"invalid pointer.")
mmap_vma = get_vma_for_offset(self.vmas, mmap_first_chunk.v())
current_border = mmap_vma[1]
# we can't check here for hitting the bottom, as mmapped regions can
# contain slack space but this test is in essence done in
# check_and_report_mmap_chunk
for curr_chunk in self.iterate_through_chunks(
mmap_first_chunk, current_border):
if self._check_and_report_mmapped_chunk(curr_chunk, mmap_vma) \
and self._check_and_report_chunksize(curr_chunk,
yield curr_chunk
# As the checks for the last MMAPPED chunk reported an error,
# we are stopping walking the MMAPPED chunks for that vm_area.
def get_all_mmapped_chunks(self):
"""Returns all allocated MMAPPED chunks."""
main_arena = self.get_main_arena()
if main_arena:
if main_arena.allocated_mmapped_chunks:
for chunk in main_arena.allocated_mmapped_chunks:
yield chunk
main_arena.allocated_mmapped_chunks = list()
for mmap_first_chunk in main_arena.mmapped_first_chunks:
for chunk in self._allocated_chunks_for_mmapped_chunk(
yield chunk
def get_aligned_address(self, address, different_align_mask=None):
"""Returns an aligned address."""
align_mask = different_align_mask or self._malloc_align_mask
aligned_address = self._address2mem(address)
aligned_address = (aligned_address + align_mask) & ~ align_mask
return self._mem2address(aligned_address)
######### code taken from malloc/malloc.c (glibc-2.23)
def _chunk2mem(self, chunk):
"""Returns the offset of the fd member (start of the data for a non
freed chunk). This is mainly used in conjunction with _aligned_ok, as
Glibc checks the alignment of chunks based on the data/fd offset."""
return (chunk.v() + self._chunk_fd_member_offset)
def _aligned_ok(self, value):
"""Returns True if the given address/size is aligned."""
return (value & self._malloc_align_mask) == 0
# essentially the request2size macro code
def get_aligned_size(self, size):
"""Returns an aligned size, which is essentially the chunk's final
size, resulting from asking Glibc for a chunk of a specific size.
E.g. calling malloc(16) will on all platforms result in a chunk with
a size > 16.
This function can be used to find chunks with a size that probably
contains the data of interest. For example: if we search for chunks
containing a struct of size 48, the resulting chunk's size can be
calculated by calling get_aligned_size(48)."""
if size + self._size_sz + self._malloc_align_mask < self._minsize:
return self._minsize & ~ self._malloc_align_mask
return (int(size + self._size_sz + self._malloc_align_mask)
& ~ self._malloc_align_mask)
# essentially an implementation of the call
# fastbin_index (request2size (MAX_FAST_SIZE)) + 1)
def _get_nfastbins(self):
"""Returns the number of fastbins. This count depends on the one hand
on the architecture and on the other hand on the MALLOC_ALIGNMENT
max_fast_size = (80 * self._size_sz) / 4
shift_bits = 4 if self._size_sz == 8 else 3
return ((self.get_aligned_size(max_fast_size) >> shift_bits) - 2) + 1
def _address2mem(self, address):
"""Essentially the same as _chunk2mem but works on an address instead
of a chunk."""
return address + self._chunk_fd_member_offset
def _mem2address(self, address):
"""Reverse function of _address2mem."""
return address - self._chunk_fd_member_offset
def _check_mmap_alignment(self, address):
"""Returns True if the given address is aligned according to the
minimum pagesize."""
return (address & (self._min_pagesize - 1)) == 0
def _get_page_aligned_address(self, address):
"""Returns an address aligned to the internal pagesize.
The given address should be a number, not a chunk.
This function is primarily used in the context of MMAPPED chunks.
return (address + self._min_pagesize - 1) & ~ (self._min_pagesize - 1)
def _check_for_bottom_chunks(self, chunk, heap_end):
"""Checks the current chunk for conditions normally only found on the
second last chunk of a heap, when there are more heaps following.
if chunk.chunksize() <= self._minsize and \
(chunk.v() + chunk.chunksize() + (self._size_sz * 2)) == \
return True
return False
def _is_bottom_chunk(
self, chunk, next_chunk, current_border, is_last_heap):
"""Returns true if the given chunk is the first of the bottom
chunks, located at the end of a thread arena's heap segment, described
by a heap_info struct."""
# on multiple heaps, for all but the last heap, the old
# top chunk is divided in at least two chunks at the
# bottom, where the second last has a size of
# minimum 2 * SIZE_SZ and maximum MINSIZE the last chunk
# has a size of 2* SIZE_SZ while the size field is set
# to 0x1 (the PREV_INUSE bit is set) and the prev_size
# contains the second last chunks size
# (min: 2 * SIZE_SZ , max: MINSIZE)
# see the part for creating a new heap within the
# sysmalloc function in malloc/malloc.c
# For glibc-2.23 beginning with line 2417
# as this behavior is included since version 2.0.1 from
# 1997, it should be safe to rely on it for most glibc
# versions
# the first bottom chunk is not bigger than MINSIZE
if not chunk.chunksize() <= self._minsize:
return False
# with a different MALLOC_ALIGNMENT, there might be a gap after the
# last bottom chunk and the end of the memory region
possible_borders = [current_border]
if self._front_misalign_correction:
possible_borders.append(current_border - (self._size_sz * 2))
if not (chunk.v() + chunk.chunksize() + (self._size_sz * 2)) in \
# the chunk is not on the edge to the current heap region
if chunk.chunksize() < self._minsize:
"Unexpected: We hit a chunk at offset 0x{0:x} "
"with a size smaller than the default minimum "
"size for a chunk but which appears to be "
"not part of the typical end of a heap. This "
"might either indicate a fatal error, or "
"maybe a custom libc implementation/custom "
"compile time flags.".format(chunk.v()))
return False
# from here on, everything looks like we are at the bottom chunks
# The last condition also tests if there are further heaps following.
# - if not, the current chunk which is only
# size_sz * 2 bytes away from the current heap's end
# shouldn't be that small and indicates an error
# - heap border shouldn't normally exist
if next_chunk.chunksize() == 0 and next_chunk.prev_inuse() and \
(next_chunk.get_prev_size() == chunk.chunksize()) and \
not is_last_heap:
if len(possible_borders) > 1 and chunk.v() + chunk.chunksize() \
+ (self._size_sz * 2) == possible_borders[1]:
self._bottom_chunk_gaps[chunk.v()] = \
self._size_sz * 2
return True
"Unexpected: We hit a chunk at offset 0x{0:x} "
"which presumably should have been the second "
"last chunk of that heap, but some conditions "
"don't meet.".format(chunk.v()))
def _allocated_chunks_for_thread_arena(self, arena):
"""Returns all allocated chunks contained in all heaps for the given
arena, assuming the arena is not the main_arena."""
if arena.is_main_arena:
"Unexpected: This method has been called with the main_arena.")
# since main_arena doesn't contain heap_infos, we return here
if arena.allocated_chunks:
for chunk in arena.allocated_chunks:
yield chunk
elif self._preserve_chunks:
arena.allocated_chunks = list()
heap_count = len(arena.heaps)
for i in list(range(heap_count)):
heap = arena.heaps[i]
current_border = heap.v() + heap.size
hit_heap_bottom = False
last_chunk = None
curr_chunk = None
for next_chunk in heap.first_chunk.next_chunk_generator():
if not curr_chunk:
curr_chunk = next_chunk
last_chunk = curr_chunk
if (curr_chunk.v() + curr_chunk.chunksize()) == current_border:
# we hit the top chunk
elif (curr_chunk.v() + curr_chunk.chunksize()) <= \
"The chunk at offset 0x{:x} has a size <= 0, which "
"is unexpected and indicates a problem. Since we can't"
" iterate the chunks anymore, we abort for this chunk."
elif self._is_bottom_chunk(curr_chunk,
i == (heap_count - 1)):
# we probably hit the bottom of the current heap
# which should'nt be the last one
"We hit the expected two chunks at the bottom "
"of a heap. This is a good sign.")
hit_heap_bottom = True
curr_chunk.is_bottom_chunk = True
if self._preserve_chunks:
yield curr_chunk
# normal chunk, not located at the bottom of the heap
elif self._check_and_report_allocated_chunk(
arena, curr_chunk, next_chunk, current_border):
curr_chunk, next_chunk.prev_inuse())
if self._preserve_chunks:
yield curr_chunk
curr_chunk = next_chunk
if not hit_heap_bottom and \
(last_chunk.v() + last_chunk.chunksize()) < current_border:
"Seems like we didn't hit the top chunk or the bottom of "
"the current heap at offset: 0x{0:x}".format(heap.v()))
def _allocated_chunks_for_main_arena(self):
"""Returns all allocated chunks for the main_arena's heap.
mmap'ed regions are not included.
arena = self.get_main_arena()
if arena.allocated_chunks:
for chunk in arena.allocated_chunks:
yield chunk
current_border = 0
if self._preserve_chunks:
arena.allocated_chunks = list()
if arena.first_chunk and arena.first_chunk.chunksize() > 0:
# as the main heap can spread among multiple vm_areas, we take
# the system_mem value as the upper boundary
if arena.system_mem > 0:
# system_mem includes the potential gap, resulting from
# a different MALLOC_ALIGNMENT, so we subtract
# self._front_misalign_correction
current_border = arena.first_chunk.v() + arena.system_mem \
- self._front_misalign_correction
# there have been rare scenarios, in which the system_mem
# value was 0
"Unexpected: system_mem value of main arena is <= 0. "
"We will calculate it with the top chunk. This will "
"lead to follow up warnings regarding size "
current_border = +
last_chunk = None
curr_chunk = None
for next_chunk in arena.first_chunk.next_chunk_generator():
last_chunk = curr_chunk
if not curr_chunk:
curr_chunk = next_chunk
if (curr_chunk.v() + curr_chunk.chunksize()) \
== current_border:
# reached top chunk
if self._check_and_report_allocated_chunk(
arena, curr_chunk, next_chunk, current_border):
if self._preserve_chunks:
yield curr_chunk
curr_chunk = next_chunk
if (last_chunk.v() + last_chunk.chunksize()) < current_border:
self.session.logging.warn("Seems like we didn't hit the "
"top chunk for main_arena.")
elif arena.first_chunk and arena.first_chunk.chunksize() == 0:
if not self._libc_offset:
"The first main arena chunk seems to have a zero "
"size. As we didn't find a mapped libc module, the "
"reason might be a statically linked executable. "
"Please provide offset for the malloc_par struct "
"(symbol name is 'mp_'). Another reason might be "
"swapped memory pages.")
"Unexpected error: The first main arena chunk "
"seems to have a zero size. The reason might be "
"swapped memory pages. Walking the chunks is aborted.")
def get_all_allocated_chunks_for_arena(self, arena):
"""Returns all allocated chunks for a given arena.
This function is basically a wrapper around
_allocated_chunks_for_main_arena and allocated_chunks_for_thread_arena.
if not arena:
"Error: allocated_chunks_for_arena called with an empty arena")
if self.session.GetParameter("debug"):
if arena.freed_fast_chunks is None or arena.freed_chunks is None:
"Unexpected error: freed chunks seem to not be initialized.")
if self.session.GetParameter("debug"):
if arena.is_main_arena:
for i in self._allocated_chunks_for_main_arena():
yield i
# not main_arena
for chunk in self._allocated_chunks_for_thread_arena(arena):
yield chunk
# at least the function depends on getting allocated chunks first and then
# freed chunks, so this order shouldn't be changed
def get_all_chunks(self):
"""Returns all chunks (allocated, freed and MMAPPED chunks)."""
for chunk in self.get_all_allocated_chunks():
yield chunk
for freed_chunk in self.get_all_freed_chunks():
yield freed_chunk
def get_all_allocated_main_chunks(self):
"""Returns all allocated chunks belonging to the main arena (excludes
thread and MMAPPED chunks)."""
for chunk in self.get_all_allocated_chunks_for_arena(
yield chunk
def get_all_allocated_thread_chunks(self):
"""Returns all allocated chunks which belong to a thread arena."""
if self.get_main_arena():
for arena in self.arenas:
if not arena.is_main_arena:
for chunk in self.get_all_allocated_chunks_for_arena(
yield chunk
def get_all_allocated_chunks(self):
"""Returns all allocated chunks, no matter to what arena they belong
or if they are MMAPPED or not."""
if self.get_main_arena():
for arena in self.arenas:
for chunk in self.get_all_allocated_chunks_for_arena(arena):
yield chunk
for chunk in self.get_all_mmapped_chunks():
yield chunk
def get_all_freed_tcache_chunks(self):
"""Returns all freed tcache chunks, no matter to what arena they
if self.get_main_arena():
for arena in self.arenas:
for free_chunk in arena.freed_tcache_chunks:
yield free_chunk
def get_all_freed_fastbin_chunks(self):
"""Returns all freed fastbin chunks, no matter to what arena they
if self.get_main_arena():
for arena in self.arenas:
for free_chunk in arena.freed_fast_chunks:
yield free_chunk
def get_all_freed_bin_chunks(self):
"""Returns all freed chunks, no matter to what arena they belong."""
if self.get_main_arena():
for arena in self.arenas:
for free_chunk in arena.freed_chunks:
yield free_chunk
def get_all_freed_chunks(self):
"""Returns all top chunks, freed chunks and freed fastbin chunks and
tcache chunks, no matter to what arena they belong."""
if self.get_main_arena():
for freed_chunk in self.get_all_freed_fastbin_chunks():
yield freed_chunk
for freed_chunk in self.get_all_freed_tcache_chunks():
yield freed_chunk
for freed_chunk in self.get_all_freed_bin_chunks():
yield freed_chunk
for arena in self.arenas:
if arena.top_chunk:
yield arena.top_chunk
def _last_heap_for_vma(self, vma):
"""Returns the last heap_info within the given vma."""
heap_hit = None
if not self.get_main_arena:
return None
for arena in self.arenas:
for heap in arena.heaps:
if vma[0] <= heap.v() < vma[1]:
if not heap_hit or heap.v() > heap_hit.v():
heap_hit = heap
return heap_hit
def heap_for_ptr(self, ptr):
"""Returns the heap from the internal heap lists, the given pointer
belongs to."""
if self.get_main_arena:
ptr_offset = None
if isinstance(ptr, Number):
ptr_offset = ptr
ptr_offset = ptr.v()
for arena in self.arenas:
for heap in arena.heaps:
if heap.v() <= ptr_offset < (heap.v() + heap.size):
return heap
return None
# We don't use the code from glibc for this function, as it depends on the
# HEAP_MAX_SIZE value and we might not have the correct value
def _heap_for_ptr(self, ptr, suppress_warning=False):
"""Returns a new heap_info struct object within the memory region, the
given pointer belongs to. If the vm_area contains multiple heaps it
walks all heap_info structs until it finds the corresponding one.
if not self._libc_profile_success:
"Libc profile is not loaded, hence no struct or constant "
"information. Aborting")
return None
vma = get_vma_for_offset(self.vmas, ptr)
if not vma:
"No vm_area found for the given pointer 0x{:x}."
return None
heap_info = self.profile._heap_info(offset=vma[0],
# there might be at least two heaps in one vm_area
while heap_info.v() + heap_info.size < ptr:
heap_info = self.profile._heap_info(
offset=heap_info.v() + heap_info.size,
if heap_info.ar_ptr not in self.arenas and not \
"The arena pointer of the heap_info struct gathered "
"from the given offset {0:x} does not seem to point "
"to any known arena. This either indicates a fatal "
"error which probably leads to unreliable results "
"or might be the result from using a pointer to a "
"MMAPPED region.".format(ptr)
return heap_info
def _get_number_of_cores(self):
"""Returns the number of cpu cores for the current memory image."""
return len(list(cpuinfo.CpuInfo(session=self.session).online_cpus()))
def _get_max_number_of_arenas(self):
"""Returns the maximum number of supported arenas. This value depends
on the number of cpu cores."""
cores = self._get_number_of_cores()
return cores * (2 if self._size_sz == 4 else 8)
def _check_arenas(self, arena, deactivate_swap_check=False):
"""Iterates the next field of the malloc_state struct and checks if we
end up at the same malloc_state after the maximum number of arenas for
the current system. Checks also for arena structs being part of
swapped memory pages."""
# This function is only reliable, if we have the offset to mp_
if not self.mp_:
# at least we test arena for being swapped
if not deactivate_swap_check:
return None
# max arena value can be adjusted at runtime via mallopt func:
# see malloc/malloc.c line 4753 and
# or on startup via env vars (see also link)
# if not, this member is 0
arena_max = self.mp_.arena_max
if arena_max > 0x1000:
"The maximum number of arenas, gathered from the malloc_par "
"struct is unexpected high ({:d}). The reason might be a "
"wrong mp_ offset and will in this case, most probably, lead "
"to follow up errors. (PID: {:d})"
if arena_max == 0:
# The maximum number of arenas is calculated with the macro
# 'NARENAS_FROM_NCORES' - See malloc/arena.c
arena_max = self._get_max_number_of_arenas()
cores = self._get_number_of_cores()
# In the case of one core, there can be one more arena than
# the result from 'NARENAS_FROM_NCORES'
# See function 'arena_get2' in malloc/arena.c
if cores == 1:
arena_max += 1
if arena_max == 0:
"The result for arena_max has been 0. This shouldn't be "
"the case and has to be looked into.")
if not deactivate_swap_check:
# as the following for loop will in this case not loop over
# any arena, we check the current arena at least for being
# swapped
curr_arena = arena
for _ in range(arena_max):
swap_check_result = self._check_and_report_arena_for_being_swapped(
curr_arena) if not deactivate_swap_check else None
if swap_check_result is not True:
curr_arena =
if arena == curr_arena:
return True
return False
def __init__(self, **kwargs):
super(HeapAnalysis, self).__init__(**kwargs)
self._libc_profile_success = False
self._libc_offset = None
self.arenas = []
self.process_as = None
# all vmas belonging to the current task
self.vmas = None
# only the vmas that we consider to belong to the current task's heap
self.heap_vmas = None
self._size_sz = None
self._malloc_alignment = None
self._malloc_align_mask = None
self._minsize = 0
self.mp_ = None
self.mp_offset = self.plugin_args.malloc_par
self._mmapped_warnings = set()
# for statically linked binaries: holds the distance from the beginning
# of the main arena until the first chunk; it is taken from
# self.mp_.sbrk_base
# this marks also the beginning of the arena's memory and hence
# influences the system_mem value
self._static_bin_first_chunk_dist = 0
self._is_probably_static_bin = False
# This value comes into play, when the MALLOC_ALIGNMENT value has been
# modified via compile time options, leading to a gap at the beginning
# of memory regions that contain chunks (so they are still aligned).
# In those cases, the first chunk does not anymore start right at the
# beginning of the heap region, but a few bytes after.
# see also malloc/malloc.c lines 2669 ff. (for main heap) resp.
# lines 2363 ff. for mmapped chunks
self._front_misalign_correction = 0
# with a different MALLOC_ALIGNMENT, there might be a gap after the
# last bottom chunk and the end of the memory region
self._bottom_chunk_gaps = dict()
self.task = None
self.statistics = None
self._heap_slack_space = dict()
self._preserve_chunks = False
self._min_pagesize = 4096
self._pt_regs_offset = self._get_pt_regs_offset()
if self.session.profile.metadata("arch") == 'I386':
self._size_sz = 4
elif self.session.profile.metadata("arch") == 'AMD64':
self._size_sz = 8
self._chunk_fd_member_offset = 0
self._pointer_size = 0
self._has_dummy_arena = False
self._libc_version = ''
self._carve_method = ''
def _initialize_malloc_alignment(self, malloc_alignment=None):
"""This function initializes MALLOC_ALIGNMENT and variables that are in
relation to MALLOC_ALIGNMENT."""
# if not given as argument, we check if malloc_alignment has been
# already set (could be done by _check_and_correct_first_chunk_offset).
# If this is not the case, we try to load it from the profile
if not malloc_alignment:
malloc_alignment = self._malloc_alignment or \
##### taken from malloc/malloc.c (glibc-2.23)
# depending on glibc comment, malloc_alignment differs only on
# powerpc32 from 2*SIZE_SZ
self._malloc_alignment = malloc_alignment if malloc_alignment \
else self._size_sz * 2
self._malloc_align_mask = self._malloc_alignment - 1
# MIN_LARGE_SIZE defines at which size the fd/bk_nextsize pointers
# are used
nsmallbins = self.profile.get_constant('NSMALLBINS')
if not nsmallbins:
nsmallbins = 64
smallbin_width = self._malloc_alignment
smallbin_correction = 1 if self._malloc_alignment > 2 * self._size_sz \
else 0
_MIN_LARGE_SIZE = ((nsmallbins - smallbin_correction) * smallbin_width)
min_chunk_size = self.profile.get_obj_offset(
"malloc_chunk", "fd_nextsize")
self._minsize = (min_chunk_size + self._malloc_align_mask) \
& ~ self._malloc_align_mask
def _check_and_correct_fastbins_count(self):
"""Checks if the curren MALLOC_ALIGNMENT value changes the count of
fastbins and if so, we update the malloc_state struct.
Returns true, if it did change the malloc_state struct."""
if self._get_nfastbins() != len(self.profile.malloc_state().fastbinsY):
"The current count of fastbinsY does not meet the calculated "
"one, which most probably is a result from a different value "
"for MALLOC_ALIGNMENT. We try to fix this and if no "
"warnings/errors occur, we probably succeded.")
malloc_state_dict = self.profile.vtypes['malloc_state']
malloc_state_dict[1]['fastbinsY'][1][1]['count'] = \
correction_size = \
(self._get_nfastbins() -
len(self.profile.malloc_state().fastbinsY)) \
* self._pointer_size
malloc_state_dict = self._correct_vtype_offsets(malloc_state_dict,
self.profile.add_types({'malloc_state': malloc_state_dict})
return True
return False
def _check_and_correct_heapinfo_pad(self):
"""Checks if the curren MALLOC_ALIGNMENT value changes the count of
pad chars and if so, we update the heap_info struct."""
##### taken from malloc/arena.c line 1046 (glibc-2.23)
expected_pad_size = (-6 * self._size_sz) & self._malloc_align_mask
if expected_pad_size != len(self.profile._heap_info().pad):
"The current size of the heap_info pad field does not meet "
"the expected size, which most probably is a result from a "
"different value for MALLOC_ALIGNMENT. We try to fix this and "
"if no warnings/errors occur, we probably succeded.")
heap_info_dict = self.profile.vtypes['_heap_info']
heap_info_dict[1]['pad'][1][1]['count'] = expected_pad_size
correction_size = expected_pad_size - \
heap_info_dict = self._correct_vtype_offsets(heap_info_dict,
self.profile.add_types({'_heap_info': heap_info_dict})
def _correct_vtype_offsets(self, vtype_dict, correction_size, first_key):
# first we correct the struct size
vtype_dict[0] = vtype_dict[0] + correction_size
# we first need to get the offset for the first key after which
# offsets should be changed
first_key_offset = vtype_dict[1][first_key][0]
for value in list(vtype_dict[1].values()):
if value[0] > first_key_offset:
value[0] = value[0] + correction_size
return vtype_dict
# Goes to the top chunk of a given arena, gets its heap_info offset and
# follows all _heap_info.prev members until the last one (for the last
# _heap_info, the prev field is 0x0
def _heaps_for_arena(self, arena):
"""Returns a sorted list of all heap_info structs for a given arena:
[0] = first heap_info.
This method is normally only called on initialization for a new task
and further access to heaps is done via the heaps attribute of each
heap_infos = list()
if arena.top_chunk:
last_heap_info = self._heap_for_ptr(arena.top_chunk.v())
if not last_heap_info:
"Unexpected error: We didn't find a heap_info struct "
"for the given top chunk. The reason might be swapped "
"pages or wrong debug information. It will lead to "
"further errors.")
return None
if not last_heap_info.ar_ptr.dereference() == arena:
"Unexpected error: current heap_info's arena pointer "
"doesn't point to the expected arena. Maybe wrong "
"profile or different cause.")
heap_infos = list(last_heap_info.walk_list('prev'))[::-1]
return heap_infos
def get_main_arena(self):
"""Returns the main_arena for the current task, which is the first
arena in the arenas list. If the current instance is not intialized,
it logs a warning."""
if self.arenas:
if self.arenas[0].is_main_arena:
return self.arenas[0]
"First arena in the arenas list doesn't seem to be the "
"There are no arenas. Maybe this instance has not been "
"initialized for the current task. Try to initialize it via "
return None
def _initialize_arenas(self, main_arena):
"""Gathers all arenas, their heaps and sets main_arenas first chunk."""
main_arena.is_main_arena = True
for arena in main_arena.walk_list('next'):
if arena.is_main_arena:
main_arena.mmapped_first_chunks = set()
main_arena_range = get_mem_range_for_regex(
self.vmas, re.escape(self._main_heap_identifier))
if main_arena_range:
offset = self.get_aligned_address(
+ self._static_bin_first_chunk_dist)
main_arena.first_chunk = self.profile.malloc_chunk(
offset, vm=self.process_as)
"The current process {:d} doesn't seem to have a main "
"heap. There are multiple possible explanations for "
"that: 1. The program uses another heap implementation"
" (e.g. Mozilla products). 2. The process didn't touch"
" the heap at all (didn't allocate any chunks within "
"the main thread). 3. We were unable to correctly "
"identify the main heap. One verification possibility "
"is to check with the 'maps' plugin, whether or not "
"this process seems to have a heap."
# in this implementation, thread arenas don't use the
# first_chunk member, but their heaps keep them
arena.heaps = self._heaps_for_arena(arena)
def _initialize_dummy_main_arena(self):
"""Creates a dummy arena, initializes relevant variables and manually
walks the main heap vma and adds all chunks to the allocated and freed
chunks lists."""
self._has_dummy_arena = True
dummy_arena = self.profile.malloc_state()
main_arena_range = get_mem_range_for_regex(
self.vmas, re.escape(self._main_heap_identifier))
# There might be scenarios in which there is no main heap but only
# mmapped chunks. In this case, main_arena_range is None.
if main_arena_range:
dummy_arena.system_mem = main_arena_range[1] - main_arena_range[0]
# we activate chunk preservation (if not prevented via cmdline
# option), as we have to walk all chunks at this point anyways
if self._preserve_chunks:
dummy_arena.allocated_chunks = list()
curr_chunk = None
# while there will be no freed chunk to gather, we still test for
# it as we need to walk the chunks anyways to get to the top chunk
for next_chunk in dummy_arena.first_chunk.next_chunk_generator():
if not curr_chunk:
curr_chunk = next_chunk
if (curr_chunk.v() + curr_chunk.chunksize()) \
== main_arena_range[1] and curr_chunk.get_size() > 0x0:
# we hit top chunk
curr_chunk.is_top_chunk = True
dummy_arena.top_chunk = curr_chunk
if not self._check_and_report_chunksize(next_chunk,
"Seems like we are not walking a valid glibc heap, so "
"we are stopping right now.")
is_in_use = next_chunk.prev_inuse()
if (curr_chunk.v() + curr_chunk.chunksize()) \
< main_arena_range[1] and not is_in_use:
curr_chunk.is_bin_chunk = True
elif self._preserve_chunks:
curr_chunk = next_chunk
if dummy_arena.top_chunk:
end = dummy_arena.top_chunk.v() \
+ dummy_arena.top_chunk.chunksize()
if dummy_arena.system_mem != end - main_arena_range[0]:
"Unexpected mismatch: memory range for main heap "
"is not equal to the range calculated with the top "
"chunk. This is unexpected, indicates a problem and "
"will most probably lead to unreliable results.")
def _mark_heap_vm_areas(self):
"""Marks all vm_areas containing known heap_info structs with
'_heap_vma_identifier'. This flag is required by other functions.
The marking process is normally done automatically in the function
_get_vmas_for_task, but in the case where no offset for the main arena
and no main heap is present, this step fails."""
known_heaps = [heap for arenas in self.arenas for heap in arenas.heaps]
for heap in known_heaps:
vma = get_vma_for_offset(self.vmas, heap.v())
if vma:
vm_data = vma[2]
vm_data['name'] = self._heap_vma_identifier
self.vmas.insert(vma[0], vma[1], vm_data)
def _check_heap_consistency(self):
"""Searches manually for heap_info structs on every potential heap
area memory region, which points to a known arena. If it finds one
that is not part of the already known heaps, it prints a warning."""
known_heaps = [heap for arenas in self.arenas for heap in arenas.heaps]
temp_heaps = set()
for vm_start, vm_end, vm_data in self.vmas:
name = vm_data['name']
if name == self._heap_vma_identifier or \
name == self._pot_mmapped_vma_identifier:
heap_info = self._heap_for_ptr(vm_start, suppress_warning=True)
if heap_info.ar_ptr in self.arenas:
if heap_info not in known_heaps:
while heap_info.v() + heap_info.size < vm_end \
and heap_info.ar_ptr in self.arenas:
heap_info = self.profile._heap_info(
offset=heap_info.v() + heap_info.size,
if heap_info.ar_ptr in self.arenas \
and heap_info not in known_heaps:
additional_heaps = set()
for temp_heap_info in temp_heaps:
for heap_info in temp_heap_info.walk_list('prev'):
if heap_info not in known_heaps:
additional_heaps = additional_heaps.union(temp_heaps)
if additional_heaps:
"We probably found at least one heap, which is not part of "
"our internal list. This shouldn't be the case, indicates a "
"problem and will lead to unreliable results. The offset(s) "
"of the additional heap(s) is/are: "
+ ("0x{:x} " * len(additional_heaps))
.format(*[heap.v() for heap in additional_heaps]))
def _check_and_correct_empty_space_in_heaps(self):
"""There are scenarios in which the last heap of an arena contains
additional space which is not covered by the top chunk, leading to
deviating results with the compare_vma_sizes_with_chunks function.
This function tries to identify those areas and add their size to the
_heap_slack_space attribute."""
for arena in self.arenas:
if not arena.is_main_arena:
# there are scenarios in which one vma shares heap_infos from
# different arenas so we gather here the last heap_info of a
# given vma and test for slack space
vma = get_vma_for_offset(self.vmas, arena.top_chunk.v())
heap = self._last_heap_for_vma(vma)
vm_end = vma[1]
if heap.v() + heap.size < vm_end:
self._heap_slack_space[heap] = (vm_end - (heap.v()
+ heap.size))
def _search_first_chunk(self, start_offset):
"""Searches from the given start_offset for indicators of the first
chunk and returns its offset or None if it couldn't be found."""
first_chunk_offset = start_offset
for _ in range(8):
temp =,
temp = struct.unpack('I' if self._size_sz == 4 else 'Q', temp)[0]
# the first member of the malloc_chunk is the prev_size
# field, which should be 0x0 for the first chunk and the
# following member is size which should be > 0x0.
if temp != 0x0:
first_chunk_offset -= self._size_sz
return first_chunk_offset
first_chunk_offset += self._size_sz
return None
def _initialize_heap_first_chunks(self):
"""Gathers the first chunk for each heap and sets it as first_chunk in
the _heap_info class."""
heap_offset = self.profile.get_obj_size('_heap_info')
malloc_offset = self.profile.get_obj_size('malloc_state')
for arena in self.arenas:
# main_arena has no associated _heap_info structs
if arena.is_main_arena:
for heap in arena.heaps:
first_chunk_offset = heap.v() + heap_offset
# only the first heap area contains also the malloc_state
# the prev field for the first heap_info is 0x0
if heap.prev == 0x0:
first_chunk_offset += malloc_offset
# chunks are aligned, so in the case of non main_arenas, the
# address after the heap_info (and malloc_state) is probably
# not directly the first chunk but a few bytes after. So we
# try to find the first non-zero size_sz bytes.
# To prevent looking in the middle of a 8 byte size from a
# large chunk, we walk in steps of 8 bytes, as this is also
# the minimal alignment (32 bit)
first_chunk_offset = self.get_aligned_address(
first_chunk_offset, different_align_mask=7)
expected_first_chunk_offset = self.get_aligned_address(
first_chunk_offset = self._search_first_chunk(
if not first_chunk_offset:
"We couldn't identify the first chunk for a thread "
"arena. This is unexpected and will most probably lead"
" to unreliable results.")
# Normally, the first chunk is exactly the aligned address
# after the structs, but if we find it somewhere else, it is
# an indicator for another libc version (e.g. differing
# structs) that we don't have the correct vtypes for or
# another MALLOC_ALIGNMENT value
if first_chunk_offset != expected_first_chunk_offset:
"We identified an unexpected address deviation. The "
"first chunk for the current heap_info at 0x{:x} "
"started {:d} bytes further than expected. This "
"indicates another glibc version than the one we are "
"using or another value for MALLOC_ALIGNMENT. Verify "
"which version is used and provide the debug "
"information for that version. At the moment, "
"officially only those versions are supported when "
"not providing debug information for a specific "
"version: {:s}"
first_chunk_offset - expected_first_chunk_offset,
if self.session.profile.metadata("arch") == 'I386':
"We just try for now to adjust the "
"MALLOC_ALIGNMENT to 16 byte (instead of 8). This "
"might prevent further problems. If not, this "
"adjustments should be prevented to see, if "
"everything works anyways.")
heap.first_chunk = self.profile.malloc_chunk(
offset=first_chunk_offset, vm=self.process_as)
if arena.top_chunk != heap.first_chunk:
heap.first_chunk, heap.first_chunk.is_in_use())
def _initialize_mmapped_first_chunks(self):
"""Gathers the first chunk for each MMAPPED region and sets it on the
main_arena. First chunks for MMAPPED regions are only kept in the
main_arena, which is the first arena in the 'arenas' attribute of the
current class."""
# we first gather all vm_area offsets belonging to the main heap or
# thread heaps
heap_offsets = []
main_arena = self.get_main_arena()
if main_arena.first_chunk:
for arena in self.arenas:
for heap in arena.heaps:
# now we gather all vm_areas that do not contain a known
# heap_info struct
for vm_start, vm_end, vm_data in self.vmas:
name = vm_data['name']
if (name == self._heap_vma_identifier
or name == self._pot_mmapped_vma_identifier) \
and vm_data['vm_flags'].startswith('rw') \
and vm_start not in heap_offsets:
offset = self.get_aligned_address(vm_start)
mmap_chunk = self.profile.malloc_chunk(offset,
if self._check_and_report_mmapped_chunk(
mmap_chunk, (vm_start, vm_end, vm_data)):
def _initialize_heap_vma_list(self):
"""Searches for vmas that are known to belong to the heap and adds
them to the internal heap_vmas list."""
self.heap_vmas = utils.RangedCollection()
for vm_start, vm_end, vm_data in self.vmas:
if vm_data['name'] == self._main_heap_identifier:
self.heap_vmas.insert(vm_start, vm_end, vm_data)
for mmap_chunk in self.get_all_mmapped_chunks():
for arena in self.arenas:
if not arena.is_main_arena:
for heap in arena.heaps:
vma = get_vma_for_offset(self.vmas, heap.v())
self.heap_vmas.insert(vma[0], vma[1], vma[2])
def _add_mmapped_chunk_to_heap_vma_list(self, mmapped_chunk):
vm_data = dict()
vm_data['is_file'] = False
vm_data['name'] = 'mmapped_chunk'
mmapped_chunk.v() + mmapped_chunk.chunksize(),
def _check_and_report_non_main_arena(self, chunk, chunk_in_use):
"""Checks the given chunk for the NON_MAIN_ARENA bit and prints a
warning if not set. This functions should obviously only be used with
chunks not belonging to main_arena but also not for MMAPPED chunks
(they don't have the NON_MAIN_ARENA bit set)."""
if chunk_in_use and not chunk.non_main_arena():
"Unexpected error: The non main arena chunk at offset 0x{0:x} "
"doesn't have the NON_MAIN_ARENA bit set.".format(chunk.v()))
def _log_mmapped_warning_messages(self, warning):
if not self.mp_:
# As there might be multiple scenarios, in which a vm_area is mistakenly
# treated as a mmapped region (see following warn messages for details),
# we strictly test for prev_size to be 0x0 (normally always the case for
# the first chunk in a memory region), the size to be != 0 and the mmapped
# bit to be set
def _check_and_report_mmapped_chunk(
self, mmap_chunk, mmap_vma, dont_report=False):
"""Checks the given chunk for various MMAPPED chunk specific
attributes. Depending on the results and the location of the chunk,
a info or warning is printed."""
base_string = ("Current MMAPPED chunk at offset 0x{0:x} "
zero_first_chunk_error_reasons = (
"As this chunk resides at the beginning of the vm_area, "
"this fact might have multiple reasons: "
"1. It is part of a MMAPPED region but there are not yet any "
"allocated chunks. 2. The current vm_area is in fact the rest of "
"a dead thread stack or belongs to a mapped file, which is not "
"disginguishable from heap-vmas at the moment. "
"3. There might be an unexpected error. "
"In the first two cases, this warning can be considered harmless.")
zero_middle_chunk_error_reasons = (
"In the current case, this fact might have the following reasons: "
"1. It is the result from an MMAPPED region, which doesn't use "
"the whole space for its chunks (in this case harmless). "
"2. The current data belongs to an MMAPPED region, which shares "
"its vm_area with an mapped file or other data (also harmless). "
"3. It results from an accidently chosen vm_area to be part of "
"the heap (more specifically, to be an MMAPPED chunks region). "
"This can happen with old thread stacks or vm_areas of mapped "
"file and indicates an error and leads to wrong results. "
"4. An unexpected error (might lead to unrealiable results).")
first_chunk_error_reasons = (
"As this chunk resides at the beginning of the vm_area, "
"this fact might have the following reasons: "
"1. The current vm_area is in fact the rest of a dead thread "
"stack or belongs to a mapped file, which is not disginguishable "
"from heap-vmas at the moment. "
"2. There might be an unexpected error. "
"In the first case, this warning can be considered harmless.")
middle_chunk_error_reasons = (
"In the current case, this fact might have the following reasons: "
"1. The current data belongs to an MMAPPED region, which shares "
"its vm_area with an mapped file or other data (in this case "
"harmless). 2. It results from an accidently chosen vm_area to be "
"part of the heap (more specifically, to be an MMAPPED chunks "
"region). This can happen with old thread stacks or vm_areas of "
"mapped file and indicates an error and leads to wrong results. "
"3. An unexpected error (might lead to unrealiable results).")
mmap_vma_start = mmap_vma[0]
mmap_vma_end = mmap_vma[1]
# as the size for mmapped chunks is at least pagesize, we expect them
# to be >= 4096
# see glibc_2.23 malloc/malloc.c lines 2315 - 2318
# with a changed MALLOC_ALIGNMENT value, mmapped chunks can start a few
# bytes after the beginning of the memory region
# moreover, their prev_size field contains the gap size
if not (mmap_chunk.get_prev_size() == self._front_misalign_correction
and mmap_chunk.chunksize() % self._min_pagesize ==
(self._min_pagesize - self._front_misalign_correction)
% self._min_pagesize) or \
mmap_chunk.chunksize() < self._min_pagesize or \
mmap_chunk.v() + mmap_chunk.chunksize() > mmap_vma_end:
if dont_report:
return False
if mmap_chunk.get_prev_size() == 0 and mmap_chunk.get_size() == 0:
base_string += "has zero size. "
if mmap_chunk.v() == self.get_aligned_address(
# it is possible that a vm_area is marked as rw and does
# not contain a stack or heap or mmap region. we
# identified this case only when no threads are active
number_of_heap_vmas = 0
for _, _, vm_data in self.vmas:
if vm_data['name'] == self._heap_vma_identifier:
number_of_heap_vmas += 1
if number_of_heap_vmas <= 1 and len(self.arenas) == 1 \
and not self._are_there_any_threads():
base_string + "In this case, it seems "
"to be the result from a process with no threads "
"and a not yet used memory region, hence "
"indicating nothing abnormal.")
base_string + zero_first_chunk_error_reasons)
base_string + zero_middle_chunk_error_reasons)
base_string += "has invalid values. "
if mmap_chunk.v() == self.get_aligned_address(
mmap_vma_start): +
base_string + middle_chunk_error_reasons)
elif mmap_chunk.prev_inuse() or mmap_chunk.non_main_arena():
if dont_report:
return False
base_string += ("has either the prev_inuse or non_main_arena bit "
"set, which is normally not the case for MMAPPED "
if mmap_chunk.v() == self.get_aligned_address(mmap_vma_start):
base_string + first_chunk_error_reasons)
base_string + middle_chunk_error_reasons)
elif not mmap_chunk.is_mmapped():
if dont_report:
return False
base_string += "doesn't have the is_mmapped bit set. "
if mmap_chunk.v() == self.get_aligned_address(mmap_vma_start):
base_string + first_chunk_error_reasons)
base_string + middle_chunk_error_reasons)
elif not self._check_mmap_alignment(mmap_chunk.v() -
if dont_report:
return False
base_string + "is not aligned. As chunks are normally always "
"aligned, this indicates a mistakenly chosen mmapped chunk "
"and probably results in wrong results.")
# everything is ok
return True
return False
def _are_there_any_threads(self):
"""This function searches for vmas containing the stack for a thread
and returns True if it finds at least one."""
# mm_users holds the number of mm_struct users. when a thread is
# created, he gets hands on the mm_struct and the counter is
# increased: mm_users >= 2 means there are threads
if >= 2:
return True
# if the first test fails, we still look for thread stack segments
for _, _, vm_data in self.vmas:
if vm_data['name'].startswith('[stack:'):
return True
return False
def iterate_through_chunks(self, first_chunk, mem_end, only_free=False,
only_alloc=False, return_last_chunk=False):
"""This function iterates chunk after chunk until hitting mem_end.
Tests for allocation status are not made via bins/fastbins but with
chunk flags. Note: This function will not return the last chunk, if
only_free or/and only_alloc is set (unless return_last_chunk is set)
as there is no PREV_INUSE bit which could be tested."""
if not (only_free or only_alloc):
for curr_chunk in first_chunk.next_chunk_generator(
if (curr_chunk.v() + curr_chunk.chunksize()) < mem_end:
yield curr_chunk
yield curr_chunk
curr_chunk = None
for next_chunk in first_chunk.next_chunk_generator():
if not curr_chunk:
curr_chunk = next_chunk
if (curr_chunk.v() + curr_chunk.chunksize()) < mem_end:
is_in_use = next_chunk.prev_inuse()
if only_free and not is_in_use or \
only_alloc and is_in_use:
yield curr_chunk
# we hit last/top chunk. as there is no following chunk, we
# can't examine the PREV_INUSE bit
if return_last_chunk:
yield curr_chunk
curr_chunk = next_chunk
def _offset_in_heap_range(self, offset):
"""Returns true if the given offset resides in a vma potentially
belonging to the heap. This function is only used while carving for
the main arena and hence can not use the later on generated internal
heap_vmas list."""
for vm_start, vm_end, vm_data in self.vmas:
if vm_start <= offset < vm_end:
name = vm_data['name']
if name == self._main_heap_identifier \
or name == self._heap_vma_identifier:
return True
return False
def _carve_main_arena(self):
"""Calling this method means that we don't have debug information (in
the sense of constant offsets for data structures) for the target libc
implementation and do not know the location of the main_arena. If the
current task contains threads however, we are able to get the location
of the main_arena. If there are no threads, we still are able to locate
the main_arena by folowing the fd/bk pointers in freed chunks.
The last attempt is done by walking the chunks of the main heap until
the top chunk is hit. As the main arena keeps a pointer to this chunk,
we simply search all memory regions for pointers.
This method returns either the main_arena or None."""
if not self._libc_profile_success:
self.session.logging.error("No libc profile with rudimentary "
"struct information available.")
return None
# this function has not yet been called, as it normally depends on an
# initialized main_arena to function fully
# as we don't have any main arena yet, we try to call it without a
# main arena, which in most cases should work
libc_range = get_libc_range(self.vmas)
if self._are_there_any_threads():
"As there are threads, we try to gather the main_arena "
"via the _heap_info structs.")
"We first try to gather the main_arena via died thread "
"heaps, assuming there are any.")
good_arenas = []
# bad arenas don't loop with their next pointer within the maximum
# number of arenas for the current number of cores and the architecture
# see _check_arenas
bad_arenas = []
# first we try to find a heap_info struct whose ar_ptr points right
# after itself this is the case for the first vm_area containing the
# first heap_info and the according malloc_state struct
for vm_start, vm_end, vm_data in self.vmas:
if vm_data['name'] == self._heap_vma_identifier \
or vm_data['name'] == self._pot_mmapped_vma_identifier:
heap_info = self.profile._heap_info(offset=vm_start,
# we try to find a heap_info struct which is followed by a
# malloc_state. The prev member of the first _heap_info struct
# (which is the one followed by the malloc_state struct) is 0x0
heap_info_size = self.profile.get_obj_size('_heap_info')
if vm_start <= heap_info.ar_ptr.v() <= vm_end:
heap_info_address = self.get_aligned_address(
heap_info_size + vm_start)
if heap_info.ar_ptr.v() == heap_info_address \
and heap_info.prev.v() == 0x0:
arena = heap_info.ar_ptr
arena_consistency = self._check_arenas(
arena, deactivate_swap_check=True)
if arena_consistency is True or arena_consistency \
is None:
reached_bad_arenas = False
# now we try to use the potential arenas to find the main_arena
# located in the libc
for arena_list in good_arenas, bad_arenas:
for arena in arena_list:
for pot_main_arena in arena.walk_list('next'):
if libc_range and libc_range[0] <= pot_main_arena.v() \
<= libc_range[1] or not libc_range and \
not self._offset_in_heap_range(pot_main_arena.v()):
if reached_bad_arenas:
"The arena pointers for the gathered "
"main_arena don't seem to loop. The reason "
"might be wrong arena pointers and probably "
"leads to unreliable results.")
"We most probably found the main_arena via "
"heap_info structs")
self._carve_method = 'heap_info'
return pot_main_arena
reached_bad_arenas = True
"It doesn't seem like the task with pid {0:d} has any threads, "
"and as we don't have have the main arena offset, we now try to "
"find freed chunks and with them the location of the main_arena."
# the previous method didn't work so we now try to gather the main
# arena via freed chunks
main_heap_range = get_mem_range_for_regex(
self.vmas, re.escape(self._main_heap_identifier))
if not main_heap_range:
return None
offset = self.get_aligned_address(main_heap_range[0] +
first_chunk = self.profile.malloc_chunk(offset, vm=self.process_as)
offset_to_top = self.profile.get_obj_offset("malloc_state", "top")
# not used right here, but part of the next method of carving the
# main arena
top_chunk = None
for free_chunk in self.iterate_through_chunks(first_chunk,
top_chunk = free_chunk
# we now try to follow the bk links to get to the main_arena
for curr_free_chunk in free_chunk.walk_list('bk'):
if libc_range and libc_range[0] <= curr_free_chunk.v() \
<= libc_range[1] or not libc_range and \
not self._offset_in_heap_range(curr_free_chunk.v()):
# we are now within the main_arena and try
# to find the top chunk by going backwards
offset_to_binmap = self.profile.get_obj_offset(
"malloc_state", "binmap")
maximum_offset_to_top = offset_to_binmap - offset_to_top
curr_off = curr_free_chunk.v()
fmt = 'I' if self._pointer_size == 4 else 'Q'
# as between the bins and top are only pointers, walking in
# size_sz steps should be no problem
for i in list(range(
0, maximum_offset_to_top, self._pointer_size)):
temp = - i,
temp = struct.unpack(fmt, temp)[0]
if main_heap_range[0] <= temp <= main_heap_range[1]:
pot_top = self.profile.malloc_chunk(
offset=temp, vm=self.process_as)
if pot_top.v() + pot_top.chunksize() == \
# we hit top chunk
"We found the main_arena via a freed "
self._carve_method = 'freed_chunk'
return self.profile.malloc_state(
offset=(curr_off - i) - offset_to_top,
# Ending up here means all previous methods were not able to find the
# main arena. The last method we try at this point is to search for
# pointers to the top chunk. At least the main_arena should have a
# pointer to the top chunk
"We couldn't identify any freed chunk leading to the main arena. "
"So the last approach is to search for the top chunk, and then "
"for pointers to it within the loaded libc module.")
if top_chunk and top_chunk.v() + top_chunk.chunksize() == \
# we most probably found our top chunk and now search for pointers
# to it
for hit in self.search_vmas_for_needle(pointers=[top_chunk.v()]):
pot_main_arena = self.profile.malloc_state(
offset=hit['hit'] - offset_to_top, vm=self.process_as)
if top_chunk == and \
pot_main_arena.system_mem == (top_chunk.v() +
top_chunk.chunksize() -
(main_heap_range[0] +
# as the 'thread arena carving' method didn't find an
# arena, the 'next' field should point to itself
if == pot_main_arena:
"We found the main_arena via top chunk.")
self._carve_method = 'top_chunk'
return pot_main_arena
arena_consistency = self._check_arenas(
pot_main_arena, deactivate_swap_check=True)
if arena_consistency is True or arena_consistency \
is None:
"We found the main_arena via top chunk.")
self._carve_method = 'top_chunk'
return pot_main_arena
return None
def _reset(self):
"""Prepares the HeapAnalysis instance to work with a new process."""
self._libc_profile_success = False
self._libc_offset = None
self.process_as = None
self.arenas = []
self.vmas = None
self.heap_vmas = None
self.mp_ = None
self._mmapped_warnings = set()
self.task = None
self.statistics = None
self._heap_slack_space = dict()
self._has_dummy_arena = False
self._static_bin_first_chunk_dist = 0
self._is_probably_static_bin = False
self._front_misalign_correction = 0
self._bottom_chunk_gaps = dict()
self._malloc_alignment = 0
self._malloc_align_mask = 0
self._libc_version = ''
self._carve_method = ''
def _check_and_report_arena_for_being_swapped(self, arena):
"""Tests some crucial fields of an arena for being swapped."""
if not arena:
# as we don't have any object to analyze further, we just return
# true in this case
return True
if == 0 or == 0 or \
arena.system_mem.v() == 0:
for field in ["top", "next", "system_mem"]:
address_to_test = arena.v()
address_to_test += self.profile.get_obj_offset("malloc_state",
if self._check_address_for_being_swapped(address_to_test):
# at least parts of the arena have been swapped
"Some crucial fields of the arena at offset 0x{:x} "
"belong to swapped pages. Hence their values can't be "
"retrieved, which will lead to more errors and "
"unreliable results.".format(arena.v()))
return True
"Some crucial fields of the arena at offset 0x{:x} are null. "
"The reason might be a wrong offset to the "
"main arena, a statically linked binary or a fundamental "
"error in this plugin. Either way, the results will most "
"probably be incorrect and incomplete.".format(arena.v()))
return False
def _check_and_report_mp_for_being_swapped(self, malloc_par_struct):
"""Tests a field of the malloc_par struct for being swapped."""
if not malloc_par_struct:
# as we don't have any object to analyze further, we just return
# true in this case
return True
elif malloc_par_struct.mmap_threshold.v() == 0:
address_to_test = malloc_par_struct.v()
address_to_test += self.profile.get_obj_offset("malloc_par",
if self._check_address_for_being_swapped(address_to_test):
"The page containing the malloc_par struct at offset "
"0x{:x} has been swapped. The MMAPPED chunk algorithms "
"will hence not work perfectly and some MMAPPED chunks "
"might be missing in the output."
return True
"The mmap_threshold field of the malloc_par struct at "
"offset 0x{:x} is null, BUT the corresponding page "
"doesn't seem to be swapped. The reason might be a wrong "
"offset to the malloc_par struct, a statically linked "
"binary or a fundamental error in this plugin. Either "
"way, the MMAPPED chunk algorithms will not work "
"perfectly, hence some MMAPPED chunks might be missing "
"in the output.".format(malloc_par_struct.v()))
return False
def init_for_task(self, task):
"""initializes the process address space and malloc_par struct and
calls initialize_*. Should be the first method to be called for each
Returns True if everything seems to be gone fine."""
# processes normally have an associated mm_struct/memory descriptor
# if there is none, it is probably a kernel thread
self.task = task
self.process_as = task.get_process_address_space()
self.vmas = self._get_vmas_for_task(task)
if self.vmas:
if self._libc_profile_success:
# as these values are used on various locations,
# we gather them only once
self._chunk_fd_member_offset = \
self.profile.get_obj_offset("malloc_chunk", "fd")
self._pointer_size = self.profile.get_obj_size("Pointer")
libc_range = get_libc_range(self.vmas)
if libc_range:
self._libc_offset = libc_range[0]
"Found libc offset at: " + hex(self._libc_offset))
if not self._libc_offset:
# might be a statically linked executable
"Didn't find the libc filename in the vm_areas of "
"the current process: {:d} - {:s} . The reason "
"might be a statically linked binary or an "
"unexpected error. We try to fix this issue for "
"the first case. Without any follow up warnings, "
"everything seems to be fine."
.format(, repr(self.task.comm.v())))
self._is_probably_static_bin = True
pot_main_arena = None
if self.plugin_args.main_arena:
main_arena_offset = self.plugin_args.main_arena
main_arena_offset = self.profile.get_constant(
if main_arena_offset:
if self._libc_offset:
main_arena_offset += self._libc_offset
pot_main_arena = self.profile.malloc_state(
offset=(main_arena_offset), profile=self.profile,
"As it seems like we don't have debug information "
"for the main arena, we now try to retrieve the "
"main_arena via some different techniques for pid "
pot_main_arena = self._carve_main_arena()
if not pot_main_arena and \
# We redefined the malloc_state struct and with
# this new struct, we search for the main_arena
# again. See _test__load_special_glibc_profile for
# further comments
pot_main_arena = self._carve_main_arena()
if not pot_main_arena:
# This will most probably only happen, if the page
# containing the main arena has been swapped
"We were not able to find the main arena for "
"task {0:d} and since we have no debug "
"information about its offset, we can't "
"retrieve it directly.".format(
pot_main_arena = \
if pot_main_arena:
if self._test_and_load_special_glibc_profile(
# seems like the malloc_state definition has been
# changed (see _test__load_special_glibc_profile
# for further comments), so we reinitialize our
# arena
offset = pot_main_arena.v()
if not self.plugin_args.main_arena and \
self._carve_method in \
['top_chunk', 'freed_chunk']:
# If the absolute offset for the main_arena
# has been given, we don't touch the offset.
# If not, and the main_arena has been gathered
# via the top chunk, we subtract 8 bytes from
# the previous offset, since this offset has
# been calculated with a wrong 'top' offset
# within the malloc state struct (the
# additional have_fastchunks field lies before
# the top chunk).
offset -= 8
pot_main_arena = self.profile.malloc_state(
offset=offset, vm=self.process_as)
pot_main_arena = \
if self._check_arenas(pot_main_arena) is False:
"Arena pointers don't seem to loop within the "
"expected range. Maybe the main_arena pointer "
"is wrong. This might lead to unreliable "
# despite potential problems, we try to proceed
# no main_arena could be found, so we simply walk
# the main_heap for chunks
"No main_arena could be found, so we simply try to"
" walk the chunks in the main heap. Without the "
"arena, fastbin chunks can't be recognized "
"reliably, and hence are treated as allocated "
"chunks. This is especially a problem on further "
"analysis (e.g. dumping their content).")
return True
"Libc profile is not loaded, "
"hence no struct or constant information. Aborting")
"No vm_areas could be extracted from current task (maybe "
"kernel thread): {:s} (PID: {:d})"
"Current task seems to be a kernel thread. Skipping Task: "
"{:s} (PID: {:d})".format(repr(task.comm.v()),
return False
def _test_and_load_special_glibc_profile(self, arena):
"""Arch linux uses for certain versions of their Glibc package (at
least 2.26-8) more recent code from the Glibc git repository, which is
not part of the official Glibc version 2.26 release. So if libc version
2.26 is used and something previously failed (which is why this
gets called), we load the malloc_state definition for the upcoming
Glibc 2.27 version.
This function returns True only if a new profile has been loaded.
This means, it will also return False if the new profile has already
been loaded."""
# the only currently supported/known case is arch's glibc package
# 2.26-8 for x64 platforms
if not (self._libc_version == '226' and
self.session.profile.metadata("arch") == 'AMD64'):
return False
# we first test, whether the 2.27 definitions already have been
# loaded
if hasattr(self.profile.malloc_state(), 'have_fastchunks'):
return False
if arena and == 0 and == 0 and \
arena.system_mem.v() == 0:
"Loading special profile as some fields "
"of the identified main_arena were null. Typically the "
"case for arch linux x64, with glibc package version "
">= 2.26-8")
return True
curr_arena = arena
for _ in range(0x100):
curr_arena =
if arena == curr_arena:
return False
"Loading special profile as the next "
"pointers of the arenas don't loop. Typically the "
"case for arch linux x64, with glibc package version >= 2.26-8")
return True
def _initialize_tcache_bins(self):
# the first chunk for an arena contains the
# tcache_perthread_struct (for glibc versions >= 2.26)
for arena in self.arenas:
if arena.is_main_arena:
for heap_info in arena.heaps:
if heap_info.prev == 0x0:
def _check_and_correct_first_chunk_offset(self):
"""This function checks for a gap between the beginning of the main
heap's memory region and the first chunk. If there is one, it either
means the binary has been linked statically or there is a different
MALLOC_ALIGNMENT set via compile time options or an unknown error.
In the first two cases, this functions tries to correct the according
main_arena_range = get_mem_range_for_regex(
self.vmas, re.escape(self._main_heap_identifier))
if not main_arena_range:
# there seems to be no main heap, so nothing to do here
if self._is_probably_static_bin and self.mp_:
# the beginning of the chunk area is pointed to by mp_.sbrk_base
self._static_bin_first_chunk_dist = \
self.mp_.sbrk_base.v() - main_arena_range[0]
heap_beginning = main_arena_range[0] + \
# when the MALLOC_ALIGNMENT has been modified, the first chunk does
# not start at the beginning of the memory region, but a few bytes
# after.
first_chunk_offset = self._search_first_chunk(heap_beginning)
if first_chunk_offset and first_chunk_offset > heap_beginning:
"We identified a gap between the beginning of the main heap "
"and the first chunk. This indicates a different "
"MALLOC_ALIGNMENT value, which we now try to fix.")
self._front_misalign_correction = first_chunk_offset - \
# the only supported case currently is an increasement from 8 to 16
if self.session.profile.metadata("arch") == 'I386' and \
self._front_misalign_correction == 8:
self._malloc_alignment = 16
"There seems to be a different MALLOC_ALIGNMENT in a case "
"that we not yet support. So get in touch with the creator"
" of this plugin and send him a nice 'WTF, fix this' ; )")
def _walk_hidden_mmapped_chunks(self, hidden_chunk):
"""Helper function for carve_and_register_hidden_mmapped_chunks.
Walks MMAPPED chunks beginning with hidden_chunks and registers them.
new_mmapped_chunks = []
# verification steps are triggered
# in allocated_chunks_for_mmapped_chunk
if hidden_chunk:
if hidden_chunk not in self.get_main_arena().mmapped_first_chunks:
for mmapped_chunk in self._allocated_chunks_for_mmapped_chunk(
return new_mmapped_chunks
def _carve_register_mmapped_chunks_hidden_behind_stack(self):
"""Tries to find hidden MMAPPED chunks behind stack segemts."""
# list of new mmapped chunks lists (first and following chunks)
new_mmapped_chunks = []
for vma in self.vmas: