Skip to content
Browse files

Merge branch 'dev' into dev3

  • Loading branch information...
Arusekk committed Nov 5, 2019
2 parents e0af5e8 + c3083d8 commit 714ae6ab69219081de89aaa3d019ec6190a549b3
@@ -2,6 +2,7 @@

from pwn import *
from glob import glob
from pwnlib.elf.maps import CAT_PROC_MAPS_EXIT

:mod:`pwnlib.elf.elf` --- ELF Files
@@ -21,7 +21,7 @@
from pwnlib import *
from pwnlib.asm import *
from pwnlib.context import Thread
from pwnlib.context import context
from pwnlib.context import context, LocalContext
from pwnlib.dynelf import DynELF
from pwnlib.encoders import *
from pwnlib.elf.corefile import Core, Corefile, Coredump
@@ -397,6 +397,7 @@ class ContextType(object):
'sparc64': big_64,
'thumb': little_32,
'vax': little_32,
'none': {},

#: Valid values for :attr:`endian`
@@ -1422,11 +1423,39 @@ def LocalContext(function):
def setter(*a, **kw):
# Fast path to skip adding a Context frame
if not kw:
return function(*a)
with context.local(**{k:kw.pop(k) for k,v in tuple(kw.items()) if isinstance(getattr(ContextType, k, None), property)}):
arch = context.arch
bits = context.bits
endian = context.endian

with context.local(**{k:kw.pop(k) for k,v in list(kw.items()) if isinstance(getattr(ContextType, k, None), property)}):
# Prevent the user from doing silly things with invalid
# architecture / bits / endianness combinations.
if (arch == 'i386' and bits != 32) \
or (arch == 'amd64' and bits != 64):
raise AttributeError("Invalid arch/bits combination: %s/%s" % (arch, bits))

if arch in ('i386', 'amd64') and endian == 'big':
raise AttributeError("Invalid arch/endianness combination: %s/%s" % (arch, endian))

return function(*a, **kw)
return setter

def LocalNoarchContext(function):
Same as LocalContext, but resets arch to :const:`'none'` by default
>>> @LocalNoarchContext
... def printArch():
... print(context.arch)
>>> printArch()
def setter(*a, **kw):
kw.setdefault('arch', 'none')
with context.local(**{k:kw.pop(k) for k,v in tuple(kw.items()) if isinstance(getattr(ContextType, k, None), property)}):
return function(*a, **kw)
return setter

@@ -1448,6 +1477,14 @@ def update_context_defaults(section):
log.warn("Unsupported configuration option %r in section %r" % (key, 'context'))

ContextType.defaults[key] = type(default)(value)
# Attempt to set the value, to see if it is value:
with context.local(**{key: value}):
value = getattr(context, key)
except (ValueError, AttributeError) as e:
log.warn("Could not set context.%s=%s via pwn.conf (%s)", key, section[key], e)

ContextType.defaults[key] = value

register_config('context', update_context_defaults)
@@ -10,6 +10,7 @@
from pwnlib.elf.datatypes import *
from pwnlib.elf.elf import ELF
from pwnlib.elf.elf import load
from pwnlib.elf import maps
from pwnlib.elf import plt

__all__ = ['load', 'ELF', 'Core'] + sorted(filter(lambda x: not x.startswith('_'), datatypes.__dict__.keys()))
@@ -43,6 +43,7 @@
import re
import six
import subprocess
import tempfile

from six import BytesIO

@@ -74,12 +75,14 @@
from pwnlib.elf.config import kernel_configuration
from pwnlib.elf.config import parse_kconfig
from pwnlib.elf.datatypes import constants
from pwnlib.elf.maps import CAT_PROC_MAPS_EXIT
from pwnlib.elf.plt import emulate_plt_instructions
from pwnlib.log import getLogger
from pwnlib.term import text
from pwnlib.tubes.process import process
from pwnlib.util import misc
from pwnlib.util import packing
from pwnlib.util.fiddling import unhex
from pwnlib.util.sh_string import sh_string

log = getLogger(__name__)
@@ -349,13 +352,15 @@ def __init__(self, path, checksec=True):
log.warn("Could not populate PLT: %s", e)


if checksec:

self._libs = None
self._maps = None

def from_assembly(assembly, *a, **kw):
@@ -496,6 +501,41 @@ def iter_segments_by_type(self, t):
if t == seg.header.p_type or t in str(seg.header.p_type):
yield seg

def get_segment_for_address(self, address, size=1):
"""get_segment_for_address(address, size=1) -> Segment
Given a virtual address described by a ``PT_LOAD`` segment, return the
first segment which describes the virtual address. An optional ``size``
may be provided to ensure the entire range falls into the same segment.
address(int): Virtual address to find
size(int): Number of bytes which must be available after ``address``
in **both** the file-backed data for the segment, and the memory
region which is reserved for the data.
Either returns a :class:`.segments.Segment` object, or ``None``.
for seg in self.iter_segments_by_type("PT_LOAD"):
mem_start = seg.header.p_vaddr
mem_stop = seg.header.p_memsz + mem_start

if not (mem_start <= address <= address+size < mem_stop):

offset = self.vaddr_to_offset(address)

file_start = seg.header.p_offset
file_stop = seg.header.p_filesz + file_start

if not (file_start <= offset <= offset+size < file_stop):

return seg

return None

def sections(self):
@@ -620,6 +660,20 @@ def non_writable_segments(self):
return [s for s in self.segments if not s.header.p_flags & P_FLAGS.PF_W]

def libs(self):
"""Dictionary of {path: address} for every library loaded for this ELF."""
if self._libs is None:
return self._libs

def maps(self):
"""Dictionary of {name: address} for every mapping in this ELF's address space."""
if self._maps is None:
return self._maps

def libc(self):
""":class:`.ELF`: If this :class:`.ELF` imports any libraries which contain ``'libc[.-]``,
@@ -641,40 +695,140 @@ def _populate_libraries(self):
>>> any(map(lambda x: 'libc' in x, bash.libs.keys()))
# Patch some shellcode into the ELF and run it.
maps = self._patch_elf_and_read_maps()

# We need a .dynamic section for dynamically linked libraries
if not self.get_section_by_name('.dynamic') or self.statically_linked:
self.libs= {}
self._maps = maps
self._libs = {}

# We must also specify a 'PT_INTERP', otherwise it's a 'statically-linked'
# binary which is also position-independent (and as such has a .dynamic).
for segment in self.iter_segments_by_type('PT_INTERP'):
self.libs = {}
for lib, address in maps.items():

# Filter out [stack] and such from the library listings
if lib.startswith('['):

# Any existing files we can just use
if os.path.exists(lib):
self._libs[lib] = address

# Try etc/qemu-binfmt, as per Ubuntu
if not self.native:
ld_prefix = qemu.ld_prefix()

qemu_lib = os.path.join(ld_prefix, lib)
qemu_lib = os.path.realpath(qemu_lib)

if os.path.exists(qemu_lib):
self._libs[qemu_lib] = address

def _patch_elf_and_read_maps(self):
"""patch_elf_and_read_maps(self) -> dict
Read ``/proc/self/maps`` as if the ELF were executing.
This is done by replacing the code at the entry point with shellcode which
dumps ``/proc/self/maps`` and exits, and **actually executing the binary**.
A ``dict`` mapping file paths to the lowest address they appear at.
Does not do any translation for e.g. QEMU emulation, the raw results
are returned.
If there is not enough space to inject the shellcode in the segment
which contains the entry point, returns ``{}``.
These tests are just to ensure that our shellcode is correct.
>>> for arch in CAT_PROC_MAPS_EXIT:
... with context.local(arch=arch):
... sc ="/proc/self/maps")
... sc += shellcraft.exit()
... sc = asm(sc)
... sc = enhex(sc)
... assert sc == CAT_PROC_MAPS_EXIT[arch]

# Get our shellcode
sc = CAT_PROC_MAPS_EXIT.get(self.arch, None)

if sc is None:
log.error("Cannot patch /proc/self/maps shellcode into %r binary", self.arch)

sc = unhex(sc)

# Ensure there is enough room in the segment where the entry point resides
# in order to inject our shellcode.
seg = self.get_segment_for_address(self.entry, len(sc))
if not seg:
log.warn_once("Could not inject code to determine memory mapping for %r: Not enough space", self)
return {}

# Create our temporary file
# NOTE: We cannot use "with NamedTemporaryFile() as foo", because we cannot
# execute the file while the handle is open.
fd, path = tempfile.mkstemp()

# Close the file descriptor so that it may be executed

# Save off a copy of the ELF

# Load a new copy of the ELF at the temporary file location
old =, len(sc))
cmd = 'ulimit -s unlimited; LD_TRACE_LOADED_OBJECTS=1 LD_WARN=1 LD_BIND_NOW=1 %s 2>/dev/null' % sh_string(self.path)
self.write(self.entry, sc)
# Restore the original contents
self.write(self.entry, old)

data = subprocess.check_output(cmd, shell = True, stderr = subprocess.STDOUT, universal_newlines = True)
libs = misc.parse_ldd_output(data)
# Make the file executable
os.chmod(path, 0o755)

for lib in dict(libs):
if os.path.exists(lib):
# Run a copy of it, get the maps
with context.silent:
io = process(path)
data = io.recvall(timeout=2)
except Exception:
log.warn_once("Injected /proc/self/maps code did not execute correctly")
return {}

# Swap in the original ELF name
data = data.replace(path, self.path)

# All we care about in the data is the load address of each file-backed mapping,
# or each kernel-supplied mapping.
# For quick reference, the data looks like this:
# 7fcb025f2000-7fcb025f3000 r--p 00025000 fe:01 3025685 /lib/x86_64-linux-gnu/
# 7fcb025f3000-7fcb025f4000 rw-p 00026000 fe:01 3025685 /lib/x86_64-linux-gnu/
# 7fcb025f4000-7fcb025f5000 rw-p 00000000 00:00 0
# 7ffe39cd4000-7ffe39cf6000 rw-p 00000000 00:00 0 [stack]
# 7ffe39d05000-7ffe39d07000 r--p 00000000 00:00 0 [vvar]
result = {}
for line in data.splitlines():
if '/' in line:
index = line.index('/')
elif '[' in line:
index = line.index('[')

address, _ = line.split('-', 1)

address = int(address, 0x10)
name = line[index:]

if not self.native:
ld_prefix = qemu.ld_prefix()
qemu_lib = os.path.exists(os.path.join(ld_prefix, lib))
if qemu_lib:
libs[os.path.realpath(qemu_lib)] = libs.pop(lib)
result.setdefault(name, address)

self.libs = libs
# Remove the temporary file, best-effort

except subprocess.CalledProcessError:
self.libs = {}
return result

def _populate_functions(self):
"""Builds a dict of 'functions' (i.e. symbols of type 'STT_FUNC')
@@ -0,0 +1,27 @@
from __future__ import absolute_import

# Pre-assembled shellcode for each architecture.
# This is literally the output of:
# shellcraft $ /proc/self/maps
# shellcraft $ARCH.linux.syscalls.exit 0

0 comments on commit 714ae6a

Please sign in to comment.
You can’t perform that action at this time.