Skip to content
Permalink
Branch: master
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
4927 lines (3961 sloc) 180 KB
#!/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2009-2018, Mario Vilas
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice,this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Process instrumentation.
@group Instrumentation:
Process
"""
from __future__ import with_statement
# FIXME
# I've been told the host process for the latest versions of VMWare
# can't be instrumented, because they try to stop code injection into the VMs.
# The solution appears to be to run the debugger from a user account that
# belongs to the VMware group. I haven't confirmed this yet.
__all__ = ['Process']
import win32
from textio import HexDump, HexInput
from util import Regenerator, PathOperations, MemoryAddresses
from module import _ModuleContainer
from thread import Thread, _ThreadContainer
from search import Search, \
Pattern, StringPattern, IStringPattern, HexPattern
from disasm import Disassembler
import re
import ctypes
import ntpath
import struct
import warnings
import traceback
from os import getenv
# Cygwin compatibility.
try:
WindowsError
except NameError:
from winappdbg.win32 import WindowsError, getenv # NOQA
# delayed import
System = None
#==============================================================================
# TODO
# * Remote GetLastError()
# * The memory operation methods do not take into account that code breakpoints
# change the memory. This object should talk to BreakpointContainer to
# retrieve the original memory contents where code breakpoints are enabled.
# * A memory cache could be implemented here.
class Process (_ThreadContainer, _ModuleContainer):
"""
Interface to a process. Contains threads and modules snapshots.
@group Properties:
get_pid, is_alive, is_debugged, is_wow64, get_arch, get_bits,
get_filename, get_exit_code,
get_start_time, get_exit_time, get_running_time,
get_services, get_dep_policy, get_peb, get_peb_address,
get_entry_point, get_main_module, get_image_base, get_image_name,
get_command_line, get_environment,
get_command_line_block,
get_environment_block, get_environment_variables,
get_handle, open_handle, close_handle
@group Instrumentation:
kill, wait, suspend, resume, inject_code, inject_dll, clean_exit
@group Disassembly:
disassemble, disassemble_around, disassemble_around_pc,
disassemble_string, disassemble_instruction, disassemble_current
@group Debugging:
flush_instruction_cache, debug_break, peek_pointers_in_data
@group Memory mapping:
take_memory_snapshot, generate_memory_snapshot, iter_memory_snapshot,
restore_memory_snapshot, get_memory_map, get_mapped_filenames,
generate_memory_map, iter_memory_map,
is_pointer, is_address_valid, is_address_free, is_address_reserved,
is_address_commited, is_address_guard, is_address_readable,
is_address_writeable, is_address_copy_on_write, is_address_executable,
is_address_executable_and_writeable,
is_buffer,
is_buffer_readable, is_buffer_writeable, is_buffer_executable,
is_buffer_executable_and_writeable, is_buffer_copy_on_write
@group Memory allocation:
malloc, free, mprotect, mquery
@group Memory read:
read, read_char, read_int, read_uint, read_float, read_double,
read_dword, read_qword, read_pointer, read_string, read_structure,
peek, peek_char, peek_int, peek_uint, peek_float, peek_double,
peek_dword, peek_qword, peek_pointer, peek_string
@group Memory write:
write, write_char, write_int, write_uint, write_float, write_double,
write_dword, write_qword, write_pointer,
poke, poke_char, poke_int, poke_uint, poke_float, poke_double,
poke_dword, poke_qword, poke_pointer
@group Memory search:
search, search_bytes, search_hexa, search_text
@group Processes snapshot:
scan, clear, __contains__, __iter__, __len__
@group Deprecated:
get_environment_data, parse_environment_data
@type dwProcessId: int
@ivar dwProcessId: Global process ID. Use L{get_pid} instead.
@type hProcess: L{ProcessHandle}
@ivar hProcess: Handle to the process. Use L{get_handle} instead.
@type fileName: str
@ivar fileName: Filename of the main module. Use L{get_filename} instead.
"""
def __init__(self, dwProcessId, hProcess = None, fileName = None):
"""
@type dwProcessId: int
@param dwProcessId: Global process ID.
@type hProcess: L{ProcessHandle}
@param hProcess: Handle to the process.
@type fileName: str
@param fileName: (Optional) Filename of the main module.
"""
_ThreadContainer.__init__(self)
_ModuleContainer.__init__(self)
self.dwProcessId = dwProcessId
self.hProcess = hProcess
self.fileName = fileName
def get_pid(self):
"""
@rtype: int
@return: Process global ID.
"""
return self.dwProcessId
def get_filename(self):
"""
@rtype: str
@return: Filename of the main module of the process.
"""
if not self.fileName:
self.fileName = self.get_image_name()
return self.fileName
def open_handle(self, dwDesiredAccess = win32.PROCESS_ALL_ACCESS):
"""
Opens a new handle to the process.
The new handle is stored in the L{hProcess} property.
@warn: Normally you should call L{get_handle} instead, since it's much
"smarter" and tries to reuse handles and merge access rights.
@type dwDesiredAccess: int
@param dwDesiredAccess: Desired access rights.
Defaults to L{win32.PROCESS_ALL_ACCESS}.
See: U{http://msdn.microsoft.com/en-us/library/windows/desktop/ms684880(v=vs.85).aspx}
@raise WindowsError: It's not possible to open a handle to the process
with the requested access rights. This typically happens because
the target process is a system process and the debugger is not
runnning with administrative rights.
"""
hProcess = win32.OpenProcess(dwDesiredAccess, win32.FALSE, self.dwProcessId)
try:
self.close_handle()
except Exception:
warnings.warn(
"Failed to close process handle: %s" % traceback.format_exc())
self.hProcess = hProcess
def close_handle(self):
"""
Closes the handle to the process.
@note: Normally you don't need to call this method. All handles
created by I{WinAppDbg} are automatically closed when the garbage
collector claims them. So unless you've been tinkering with it,
setting L{hProcess} to C{None} should be enough.
"""
try:
if hasattr(self.hProcess, 'close'):
self.hProcess.close()
elif self.hProcess not in (None, win32.INVALID_HANDLE_VALUE):
win32.CloseHandle(self.hProcess)
finally:
self.hProcess = None
def get_handle(self, dwDesiredAccess = win32.PROCESS_ALL_ACCESS):
"""
Returns a handle to the process with I{at least} the access rights
requested.
@note:
If a handle was previously opened and has the required access
rights, it's reused. If not, a new handle is opened with the
combination of the old and new access rights.
@type dwDesiredAccess: int
@param dwDesiredAccess: Desired access rights.
Defaults to L{win32.PROCESS_ALL_ACCESS}.
See: U{http://msdn.microsoft.com/en-us/library/windows/desktop/ms684880(v=vs.85).aspx}
@rtype: L{ProcessHandle}
@return: Handle to the process.
@raise WindowsError: It's not possible to open a handle to the process
with the requested access rights. This typically happens because
the target process is a system process and the debugger is not
runnning with administrative rights.
"""
if self.hProcess in (None, win32.INVALID_HANDLE_VALUE):
self.open_handle(dwDesiredAccess)
else:
dwAccess = self.hProcess.dwAccess
if (dwAccess | dwDesiredAccess) != dwAccess:
self.open_handle(dwAccess | dwDesiredAccess)
return self.hProcess
#------------------------------------------------------------------------------
# Not really sure if it's a good idea...
## def __eq__(self, aProcess):
## """
## Compare two Process objects. The comparison is made using the IDs.
##
## @warning:
## If you have two Process instances with different handles the
## equality operator still returns C{True}, so be careful!
##
## @type aProcess: L{Process}
## @param aProcess: Another Process object.
##
## @rtype: bool
## @return: C{True} if the two process IDs are equal,
## C{False} otherwise.
## """
## return isinstance(aProcess, Process) and \
## self.get_pid() == aProcess.get_pid()
def __contains__(self, anObject):
"""
The same as: C{self.has_thread(anObject) or self.has_module(anObject)}
@type anObject: L{Thread}, L{Module} or int
@param anObject: Object to look for.
Can be a Thread, Module, thread global ID or module base address.
@rtype: bool
@return: C{True} if the requested object was found in the snapshot.
"""
return _ThreadContainer.__contains__(self, anObject) or \
_ModuleContainer.__contains__(self, anObject)
def __len__(self):
"""
@see: L{get_thread_count}, L{get_module_count}
@rtype: int
@return: Count of L{Thread} and L{Module} objects in this snapshot.
"""
return _ThreadContainer.__len__(self) + \
_ModuleContainer.__len__(self)
class __ThreadsAndModulesIterator (object):
"""
Iterator object for L{Process} objects.
Iterates through L{Thread} objects first, L{Module} objects next.
"""
def __init__(self, container):
"""
@type container: L{Process}
@param container: L{Thread} and L{Module} container.
"""
self.__container = container
self.__iterator = None
self.__state = 0
def __iter__(self):
'x.__iter__() <==> iter(x)'
return self
def next(self):
'x.next() -> the next value, or raise StopIteration'
if self.__state == 0:
self.__iterator = self.__container.iter_threads()
self.__state = 1
if self.__state == 1:
try:
return self.__iterator.next()
except StopIteration:
self.__iterator = self.__container.iter_modules()
self.__state = 2
if self.__state == 2:
try:
return self.__iterator.next()
except StopIteration:
self.__iterator = None
self.__state = 3
raise StopIteration
def __iter__(self):
"""
@see: L{iter_threads}, L{iter_modules}
@rtype: iterator
@return: Iterator of L{Thread} and L{Module} objects in this snapshot.
All threads are iterated first, then all modules.
"""
return self.__ThreadsAndModulesIterator(self)
#------------------------------------------------------------------------------
def wait(self, dwTimeout = None):
"""
Waits for the process to finish executing.
@raise WindowsError: On error an exception is raised.
"""
self.get_handle(win32.SYNCHRONIZE).wait(dwTimeout)
def kill(self, dwExitCode = 0):
"""
Terminates the execution of the process.
@raise WindowsError: On error an exception is raised.
"""
hProcess = self.get_handle(win32.PROCESS_TERMINATE)
win32.TerminateProcess(hProcess, dwExitCode)
def suspend(self):
"""
Suspends execution on all threads of the process.
@raise WindowsError: On error an exception is raised.
"""
self.scan_threads() # force refresh the snapshot
suspended = list()
try:
for aThread in self.iter_threads():
aThread.suspend()
suspended.append(aThread)
except Exception:
for aThread in suspended:
try:
aThread.resume()
except Exception:
pass
raise
def resume(self):
"""
Resumes execution on all threads of the process.
@raise WindowsError: On error an exception is raised.
"""
if self.get_thread_count() == 0:
self.scan_threads() # only refresh the snapshot if empty
resumed = list()
try:
for aThread in self.iter_threads():
aThread.resume()
resumed.append(aThread)
except Exception:
for aThread in resumed:
try:
aThread.suspend()
except Exception:
pass
raise
def is_debugged(self):
"""
Tries to determine if the process is being debugged by another process.
It may detect other debuggers besides WinAppDbg.
@rtype: bool
@return: C{True} if the process has a debugger attached.
@warning:
May return inaccurate results when some anti-debug techniques are
used by the target process.
@note: To know if a process currently being debugged by a L{Debug}
object, call L{Debug.is_debugee} instead.
"""
# FIXME the MSDN docs don't say what access rights are needed here!
hProcess = self.get_handle(win32.PROCESS_QUERY_INFORMATION)
return win32.CheckRemoteDebuggerPresent(hProcess)
def is_alive(self):
"""
@rtype: bool
@return: C{True} if the process is currently running.
"""
try:
self.wait(0)
except WindowsError, e:
return e.winerror == win32.WAIT_TIMEOUT
return False
def get_exit_code(self):
"""
@rtype: int
@return: Process exit code, or C{STILL_ACTIVE} if it's still alive.
@warning: If a process returns C{STILL_ACTIVE} as it's exit code,
you may not be able to determine if it's active or not with this
method. Use L{is_alive} to check if the process is still active.
Alternatively you can call L{get_handle} to get the handle object
and then L{ProcessHandle.wait} on it to wait until the process
finishes running.
"""
if win32.PROCESS_ALL_ACCESS == win32.PROCESS_ALL_ACCESS_VISTA:
dwAccess = win32.PROCESS_QUERY_LIMITED_INFORMATION
else:
dwAccess = win32.PROCESS_QUERY_INFORMATION
return win32.GetExitCodeProcess( self.get_handle(dwAccess) )
#------------------------------------------------------------------------------
def scan(self):
"""
Populates the snapshot of threads and modules.
"""
self.scan_threads()
self.scan_modules()
def clear(self):
"""
Clears the snapshot of threads and modules.
"""
try:
try:
self.clear_threads()
finally:
self.clear_modules()
finally:
self.close_handle()
#------------------------------------------------------------------------------
# Regular expression to find hexadecimal values of any size.
__hexa_parameter = re.compile('0x[0-9A-Fa-f]+')
def __fixup_labels(self, disasm):
"""
Private method used when disassembling from process memory.
It has no return value because the list is modified in place. On return
all raw memory addresses are replaced by labels when possible.
@type disasm: list of tuple(int, int, str, str)
@param disasm: Output of one of the dissassembly functions.
"""
for index in xrange(len(disasm)):
(address, size, text, dump) = disasm[index]
m = self.__hexa_parameter.search(text)
while m:
s, e = m.span()
value = text[s:e]
try:
label = self.get_label_at_address( int(value, 0x10) )
except Exception:
label = None
if label:
text = text[:s] + label + text[e:]
e = s + len(value)
m = self.__hexa_parameter.search(text, e)
disasm[index] = (address, size, text, dump)
def disassemble_string(self, lpAddress, code):
"""
Disassemble instructions from a block of binary code.
@type lpAddress: int
@param lpAddress: Memory address where the code was read from.
@type code: str
@param code: Binary code to disassemble.
@rtype: list of tuple( long, int, str, str )
@return: List of tuples. Each tuple represents an assembly instruction
and contains:
- Memory address of instruction.
- Size of instruction in bytes.
- Disassembly line of instruction.
- Hexadecimal dump of instruction.
@raise NotImplementedError:
No compatible disassembler was found for the current platform.
"""
try:
disasm = self.__disasm
except AttributeError:
disasm = self.__disasm = Disassembler( self.get_arch() )
return disasm.decode(lpAddress, code)
def disassemble(self, lpAddress, dwSize):
"""
Disassemble instructions from the address space of the process.
@type lpAddress: int
@param lpAddress: Memory address where to read the code from.
@type dwSize: int
@param dwSize: Size of binary code to disassemble.
@rtype: list of tuple( long, int, str, str )
@return: List of tuples. Each tuple represents an assembly instruction
and contains:
- Memory address of instruction.
- Size of instruction in bytes.
- Disassembly line of instruction.
- Hexadecimal dump of instruction.
"""
data = self.read(lpAddress, dwSize)
disasm = self.disassemble_string(lpAddress, data)
self.__fixup_labels(disasm)
return disasm
# FIXME
# This algorithm really sucks, I've got to write a better one :P
def disassemble_around(self, lpAddress, dwSize = 64):
"""
Disassemble around the given address.
@type lpAddress: int
@param lpAddress: Memory address where to read the code from.
@type dwSize: int
@param dwSize: Delta offset.
Code will be read from lpAddress - dwSize to lpAddress + dwSize.
@rtype: list of tuple( long, int, str, str )
@return: List of tuples. Each tuple represents an assembly instruction
and contains:
- Memory address of instruction.
- Size of instruction in bytes.
- Disassembly line of instruction.
- Hexadecimal dump of instruction.
"""
dwDelta = int(float(dwSize) / 2.0)
addr_1 = lpAddress - dwDelta
addr_2 = lpAddress
size_1 = dwDelta
#size_2 = dwSize - dwDelta
data = self.read(addr_1, dwSize)
data_1 = data[:size_1]
data_2 = data[size_1:]
disasm_1 = self.disassemble_string(addr_1, data_1)
disasm_2 = self.disassemble_string(addr_2, data_2)
disasm = disasm_1 + disasm_2
self.__fixup_labels(disasm)
return disasm
def disassemble_around_pc(self, dwThreadId, dwSize = 64):
"""
Disassemble around the program counter of the given thread.
@type dwThreadId: int
@param dwThreadId: Global thread ID.
The program counter for this thread will be used as the disassembly
address.
@type dwSize: int
@param dwSize: Delta offset.
Code will be read from pc - dwSize to pc + dwSize.
@rtype: list of tuple( long, int, str, str )
@return: List of tuples. Each tuple represents an assembly instruction
and contains:
- Memory address of instruction.
- Size of instruction in bytes.
- Disassembly line of instruction.
- Hexadecimal dump of instruction.
"""
aThread = self.get_thread(dwThreadId)
return self.disassemble_around(aThread.get_pc(), dwSize)
def disassemble_instruction(self, lpAddress):
"""
Disassemble the instruction at the given memory address.
@type lpAddress: int
@param lpAddress: Memory address where to read the code from.
@rtype: tuple( long, int, str, str )
@return: The tuple represents an assembly instruction
and contains:
- Memory address of instruction.
- Size of instruction in bytes.
- Disassembly line of instruction.
- Hexadecimal dump of instruction.
"""
return self.disassemble(lpAddress, 15)[0]
def disassemble_current(self, dwThreadId):
"""
Disassemble the instruction at the program counter of the given thread.
@type dwThreadId: int
@param dwThreadId: Global thread ID.
The program counter for this thread will be used as the disassembly
address.
@rtype: tuple( long, int, str, str )
@return: The tuple represents an assembly instruction
and contains:
- Memory address of instruction.
- Size of instruction in bytes.
- Disassembly line of instruction.
- Hexadecimal dump of instruction.
"""
aThread = self.get_thread(dwThreadId)
return self.disassemble_instruction(aThread.get_pc())
#------------------------------------------------------------------------------
def flush_instruction_cache(self):
"""
Flush the instruction cache. This is required if the process memory is
modified and one or more threads are executing nearby the modified
memory region.
@see: U{http://blogs.msdn.com/oldnewthing/archive/2003/12/08/55954.aspx#55958}
@raise WindowsError: Raises exception on error.
"""
# FIXME
# No idea what access rights are required here!
# Maybe PROCESS_VM_OPERATION ???
# In any case we're only calling this from the debugger,
# so it should be fine (we already have PROCESS_ALL_ACCESS).
win32.FlushInstructionCache( self.get_handle() )
def debug_break(self):
"""
Triggers the system breakpoint in the process.
@raise WindowsError: On error an exception is raised.
"""
# The exception is raised by a new thread.
# When continuing the exception, the thread dies by itself.
# This thread is hidden from the debugger.
win32.DebugBreakProcess( self.get_handle() )
def is_wow64(self):
"""
Determines if the process is running under WOW64.
@rtype: bool
@return:
C{True} if the process is running under WOW64. That is, a 32-bit
application running in a 64-bit Windows.
C{False} if the process is either a 32-bit application running in
a 32-bit Windows, or a 64-bit application running in a 64-bit
Windows.
@raise WindowsError: On error an exception is raised.
@see: U{http://msdn.microsoft.com/en-us/library/aa384249(VS.85).aspx}
"""
try:
wow64 = self.__wow64
except AttributeError:
if (win32.bits == 32 and not win32.wow64):
wow64 = False
else:
if win32.PROCESS_ALL_ACCESS == win32.PROCESS_ALL_ACCESS_VISTA:
dwAccess = win32.PROCESS_QUERY_LIMITED_INFORMATION
else:
dwAccess = win32.PROCESS_QUERY_INFORMATION
hProcess = self.get_handle(dwAccess)
try:
wow64 = win32.IsWow64Process(hProcess)
except AttributeError:
wow64 = False
self.__wow64 = wow64
return wow64
def get_arch(self):
"""
@rtype: str
@return: The architecture in which this process believes to be running.
For example, if running a 32 bit binary in a 64 bit machine, the
architecture returned by this method will be L{win32.ARCH_I386},
but the value of L{System.arch} will be L{win32.ARCH_AMD64}.
"""
# Are we in a 32 bit machine?
if win32.bits == 32 and not win32.wow64:
return win32.arch
# Is the process outside of WOW64?
if not self.is_wow64():
return win32.arch
# In WOW64, "amd64" becomes "i386".
if win32.arch == win32.ARCH_AMD64:
return win32.ARCH_I386
# We don't know the translation for other architectures.
raise NotImplementedError()
def get_bits(self):
"""
@rtype: str
@return: The number of bits in which this process believes to be
running. For example, if running a 32 bit binary in a 64 bit
machine, the number of bits returned by this method will be C{32},
but the value of L{System.arch} will be C{64}.
"""
# Are we in a 32 bit machine?
if win32.bits == 32 and not win32.wow64:
# All processes are 32 bits.
return 32
# Is the process inside WOW64?
if self.is_wow64():
# The process is 32 bits.
return 32
# The process is 64 bits.
return 64
# TODO: get_os, to test compatibility run
# See: http://msdn.microsoft.com/en-us/library/windows/desktop/ms683224(v=vs.85).aspx
#------------------------------------------------------------------------------
def get_start_time(self):
"""
Determines when has this process started running.
@rtype: win32.SYSTEMTIME
@return: Process start time.
"""
if win32.PROCESS_ALL_ACCESS == win32.PROCESS_ALL_ACCESS_VISTA:
dwAccess = win32.PROCESS_QUERY_LIMITED_INFORMATION
else:
dwAccess = win32.PROCESS_QUERY_INFORMATION
hProcess = self.get_handle(dwAccess)
CreationTime = win32.GetProcessTimes(hProcess)[0]
return win32.FileTimeToSystemTime(CreationTime)
def get_exit_time(self):
"""
Determines when has this process finished running.
If the process is still alive, the current time is returned instead.
@rtype: win32.SYSTEMTIME
@return: Process exit time.
"""
if self.is_alive():
ExitTime = win32.GetSystemTimeAsFileTime()
else:
if win32.PROCESS_ALL_ACCESS == win32.PROCESS_ALL_ACCESS_VISTA:
dwAccess = win32.PROCESS_QUERY_LIMITED_INFORMATION
else:
dwAccess = win32.PROCESS_QUERY_INFORMATION
hProcess = self.get_handle(dwAccess)
ExitTime = win32.GetProcessTimes(hProcess)[1]
return win32.FileTimeToSystemTime(ExitTime)
def get_running_time(self):
"""
Determines how long has this process been running.
@rtype: long
@return: Process running time in milliseconds.
"""
if win32.PROCESS_ALL_ACCESS == win32.PROCESS_ALL_ACCESS_VISTA:
dwAccess = win32.PROCESS_QUERY_LIMITED_INFORMATION
else:
dwAccess = win32.PROCESS_QUERY_INFORMATION
hProcess = self.get_handle(dwAccess)
(CreationTime, ExitTime, _, _) = win32.GetProcessTimes(hProcess)
if self.is_alive():
ExitTime = win32.GetSystemTimeAsFileTime()
CreationTime = CreationTime.dwLowDateTime + (CreationTime.dwHighDateTime << 32)
ExitTime = ExitTime.dwLowDateTime + ( ExitTime.dwHighDateTime << 32)
RunningTime = ExitTime - CreationTime
return RunningTime / 10000 # 100 nanoseconds steps => milliseconds
#------------------------------------------------------------------------------
def __load_System_class(self):
global System # delayed import
if System is None:
from system import System
def get_services(self):
"""
Retrieves the list of system services that are currently running in
this process.
@see: L{System.get_services}
@rtype: list( L{win32.ServiceStatusProcessEntry} )
@return: List of service status descriptors.
"""
self.__load_System_class()
pid = self.get_pid()
return [d for d in System.get_active_services() if d.ProcessId == pid]
#------------------------------------------------------------------------------
def get_dep_policy(self):
"""
Retrieves the DEP (Data Execution Prevention) policy for this process.
@note: This method is only available in Windows XP SP3 and above, and
only for 32 bit processes. It will fail in any other circumstance.
@see: U{http://msdn.microsoft.com/en-us/library/bb736297(v=vs.85).aspx}
@rtype: tuple(int, int)
@return:
The first member of the tuple is the DEP flags. It can be a
combination of the following values:
- 0: DEP is disabled for this process.
- 1: DEP is enabled for this process. (C{PROCESS_DEP_ENABLE})
- 2: DEP-ATL thunk emulation is disabled for this process.
(C{PROCESS_DEP_DISABLE_ATL_THUNK_EMULATION})
The second member of the tuple is the permanent flag. If C{TRUE}
the DEP settings cannot be changed in runtime for this process.
@raise WindowsError: On error an exception is raised.
"""
hProcess = self.get_handle(win32.PROCESS_QUERY_INFORMATION)
try:
return win32.kernel32.GetProcessDEPPolicy(hProcess)
except AttributeError:
msg = "This method is only available in Windows XP SP3 and above."
raise NotImplementedError(msg)
#------------------------------------------------------------------------------
def get_peb(self):
"""
Returns a copy of the PEB.
To dereference pointers in it call L{Process.read_structure}.
@rtype: L{win32.PEB}
@return: PEB structure.
@raise WindowsError: An exception is raised on error.
"""
self.get_handle( win32.PROCESS_VM_READ |
win32.PROCESS_QUERY_INFORMATION )
return self.read_structure(self.get_peb_address(), win32.PEB)
def get_peb_address(self):
"""
Returns a remote pointer to the PEB.
@rtype: int
@return: Remote pointer to the L{win32.PEB} structure.
Returns C{None} on error.
"""
try:
return self._peb_ptr
except AttributeError:
hProcess = self.get_handle(win32.PROCESS_QUERY_INFORMATION)
pbi = win32.NtQueryInformationProcess(hProcess,
win32.ProcessBasicInformation)
address = pbi.PebBaseAddress
self._peb_ptr = address
return address
def get_entry_point(self):
"""
Alias to C{process.get_main_module().get_entry_point()}.
@rtype: int
@return: Address of the entry point of the main module.
"""
return self.get_main_module().get_entry_point()
def get_main_module(self):
"""
@rtype: L{Module}
@return: Module object for the process main module.
"""
return self.get_module(self.get_image_base())
def get_image_base(self):
"""
@rtype: int
@return: Image base address for the process main module.
"""
return self.get_peb().ImageBaseAddress
def get_image_name(self):
"""
@rtype: int
@return: Filename of the process main module.
This method does it's best to retrieve the filename.
However sometimes this is not possible, so C{None} may
be returned instead.
"""
# Method 1: Module.fileName
# It's cached if the filename was already found by the other methods,
# if it came with the corresponding debug event, or it was found by the
# toolhelp API.
mainModule = None
try:
mainModule = self.get_main_module()
name = mainModule.fileName
if not name:
name = None
except (KeyError, AttributeError, WindowsError):
## traceback.print_exc() # XXX DEBUG
name = None
# Method 2: QueryFullProcessImageName()
# Not implemented until Windows Vista.
if not name:
try:
hProcess = self.get_handle(
win32.PROCESS_QUERY_LIMITED_INFORMATION)
name = win32.QueryFullProcessImageName(hProcess)
except (AttributeError, WindowsError):
## traceback.print_exc() # XXX DEBUG
name = None
# Method 3: GetProcessImageFileName()
#
# Not implemented until Windows XP.
# For more info see:
# https://voidnish.wordpress.com/2005/06/20/getprocessimagefilenamequerydosdevice-trivia/
if not name:
try:
hProcess = self.get_handle(win32.PROCESS_QUERY_INFORMATION)
name = win32.GetProcessImageFileName(hProcess)
if name:
name = PathOperations.native_to_win32_pathname(name)
else:
name = None
except (AttributeError, WindowsError):
## traceback.print_exc() # XXX DEBUG
if not name:
name = None
# Method 4: GetModuleFileNameEx()
# Not implemented until Windows 2000.
#
# May be spoofed by malware, since this information resides
# in usermode space (see http://www.ragestorm.net/blogs/?p=163).
if not name:
try:
hProcess = self.get_handle( win32.PROCESS_VM_READ |
win32.PROCESS_QUERY_INFORMATION )
try:
name = win32.GetModuleFileNameEx(hProcess)
except WindowsError:
## traceback.print_exc() # XXX DEBUG
name = win32.GetModuleFileNameEx(
hProcess, self.get_image_base())
if name:
name = PathOperations.native_to_win32_pathname(name)
else:
name = None
except (AttributeError, WindowsError):
## traceback.print_exc() # XXX DEBUG
if not name:
name = None
# Method 5: PEB.ProcessParameters->ImagePathName
#
# May fail since it's using an undocumented internal structure.
#
# May be spoofed by malware, since this information resides
# in usermode space (see http://www.ragestorm.net/blogs/?p=163).
if not name:
try:
peb = self.get_peb()
pp = self.read_structure(peb.ProcessParameters,
win32.RTL_USER_PROCESS_PARAMETERS)
s = pp.ImagePathName
name = self.peek_string(s.Buffer,
dwMaxSize=s.MaximumLength, fUnicode=True)
if name:
name = PathOperations.native_to_win32_pathname(name)
else:
name = None
except (AttributeError, WindowsError):
## traceback.print_exc() # XXX DEBUG
name = None
# Method 6: Module.get_filename()
# It tries to get the filename from the file handle.
#
# There are currently some problems due to the strange way the API
# works - it returns the pathname without the drive letter, and I
# couldn't figure out a way to fix it.
if not name and mainModule is not None:
try:
name = mainModule.get_filename()
if not name:
name = None
except (AttributeError, WindowsError):
## traceback.print_exc() # XXX DEBUG
name = None
# Remember the filename.
if name and mainModule is not None:
mainModule.fileName = name
# Return the image filename, or None on error.
return name
def get_command_line_block(self):
"""
Retrieves the command line block memory address and size.
@rtype: tuple(int, int)
@return: Tuple with the memory address of the command line block
and it's maximum size in Unicode characters.
@raise WindowsError: On error an exception is raised.
"""
peb = self.get_peb()
pp = self.read_structure(peb.ProcessParameters,
win32.RTL_USER_PROCESS_PARAMETERS)
s = pp.CommandLine
return (s.Buffer, s.MaximumLength)
def get_environment_block(self):
"""
Retrieves the environment block memory address for the process.
@note: The size is always enough to contain the environment data, but
it may not be an exact size. It's best to read the memory and
scan for two null wide chars to find the actual size.
@rtype: tuple(int, int)
@return: Tuple with the memory address of the environment block
and it's size.
@raise WindowsError: On error an exception is raised.
"""
peb = self.get_peb()
pp = self.read_structure(peb.ProcessParameters,
win32.RTL_USER_PROCESS_PARAMETERS)
Environment = pp.Environment
try:
EnvironmentSize = pp.EnvironmentSize
except AttributeError:
mbi = self.mquery(Environment)
EnvironmentSize = mbi.RegionSize + mbi.BaseAddress - Environment
return (Environment, EnvironmentSize)
def get_command_line(self):
"""
Retrieves the command line with wich the program was started.
@rtype: str
@return: Command line string.
@raise WindowsError: On error an exception is raised.
"""
(Buffer, MaximumLength) = self.get_command_line_block()
CommandLine = self.peek_string(Buffer, dwMaxSize=MaximumLength,
fUnicode=True)
gst = win32.GuessStringType
if gst.t_default == gst.t_ansi:
CommandLine = CommandLine.encode('cp1252')
return CommandLine
def get_environment_variables(self):
"""
Retrieves the environment variables with wich the program is running.
@rtype: list of tuple(unicode, unicode)
@return: Environment keys and values as found in the process memory.
@raise WindowsError: On error an exception is raised.
"""
# Note: the first bytes are garbage and must be skipped. Then the first
# two environment entries are the current drive and directory as key
# and value pairs, followed by the ExitCode variable (it's what batch
# files know as "errorlevel"). After that, the real environment vars
# are there in alphabetical order. In theory that's where it stops,
# but I've always seen one more "variable" tucked at the end which
# may be another environment block but in ANSI. I haven't examined it
# yet, I'm just skipping it because if it's parsed as Unicode it just
# renders garbage.
# Read the environment block contents.
data = self.peek( *self.get_environment_block() )
# Put them into a Unicode buffer.
tmp = ctypes.create_string_buffer(data)
buffer = ctypes.create_unicode_buffer(len(data))
ctypes.memmove(buffer, tmp, len(data))
del tmp
# Skip until the first Unicode null char is found.
pos = 0
while buffer[pos] != u'\0':
pos += 1
pos += 1
# Loop for each environment variable...
environment = []
while buffer[pos] != u'\0':
# Until we find a null char...
env_name_pos = pos
env_name = u''
found_name = False
while buffer[pos] != u'\0':
# Get the current char.
char = buffer[pos]
# Is it an equal sign?
if char == u'=':
# Skip leading equal signs.
if env_name_pos == pos:
env_name_pos += 1
pos += 1
continue
# Otherwise we found the separator equal sign.
pos += 1
found_name = True
break
# Add the char to the variable name.
env_name += char
# Next char.
pos += 1
# If the name was not parsed properly, stop.
if not found_name:
break
# Read the variable value until we find a null char.
env_value = u''
while buffer[pos] != u'\0':
env_value += buffer[pos]
pos += 1
# Skip the null char.
pos += 1
# Add to the list of environment variables found.
environment.append( (env_name, env_value) )
# Remove the last entry, it's garbage.
if environment:
environment.pop()
# Return the environment variables.
return environment
def get_environment_data(self, fUnicode = None):
"""
Retrieves the environment block data with wich the program is running.
@warn: Deprecated since WinAppDbg 1.5.
@see: L{win32.GuessStringType}
@type fUnicode: bool or None
@param fUnicode: C{True} to return a list of Unicode strings, C{False}
to return a list of ANSI strings, or C{None} to return whatever
the default is for string types.
@rtype: list of str
@return: Environment keys and values separated by a (C{=}) character,
as found in the process memory.
@raise WindowsError: On error an exception is raised.
"""
# Issue a deprecation warning.
warnings.warn(
"Process.get_environment_data() is deprecated" \
" since WinAppDbg 1.5.",
DeprecationWarning)
# Get the environment variables.
block = [ key + u'=' + value for (key, value) \
in self.get_environment_variables() ]
# Convert the data to ANSI if requested.
if fUnicode is None:
gst = win32.GuessStringType
fUnicode = gst.t_default == gst.t_unicode
if not fUnicode:
block = [x.encode('cp1252') for x in block]
# Return the environment data.
return block
@staticmethod
def parse_environment_data(block):
"""
Parse the environment block into a Python dictionary.
@warn: Deprecated since WinAppDbg 1.5.
@note: Values of duplicated keys are joined using null characters.
@type block: list of str
@param block: List of strings as returned by L{get_environment_data}.
@rtype: dict(str S{->} str)
@return: Dictionary of environment keys and values.
"""
# Issue a deprecation warning.
warnings.warn(
"Process.parse_environment_data() is deprecated" \
" since WinAppDbg 1.5.",
DeprecationWarning)
# Create an empty environment dictionary.
environment = dict()
# End here if the environment block is empty.
if not block:
return environment
# Prepare the tokens (ANSI or Unicode).
gst = win32.GuessStringType
if type(block[0]) == gst.t_ansi:
equals = '='
terminator = '\0'
else:
equals = u'='
terminator = u'\0'
# Split the blocks into key/value pairs.
for chunk in block:
sep = chunk.find(equals, 1)
if sep < 0:
## raise Exception()
continue # corrupted environment block?
key, value = chunk[:sep], chunk[sep+1:]
# For duplicated keys, append the value.
# Values are separated using null terminators.
if key not in environment:
environment[key] = value
else:
environment[key] += terminator + value
# Return the environment dictionary.
return environment
def get_environment(self, fUnicode = None):
"""
Retrieves the environment with wich the program is running.
@note: Duplicated keys are joined using null characters.
To avoid this behavior, call L{get_environment_variables} instead
and convert the results to a dictionary directly, like this:
C{dict(process.get_environment_variables())}
@see: L{win32.GuessStringType}
@type fUnicode: bool or None
@param fUnicode: C{True} to return a list of Unicode strings, C{False}
to return a list of ANSI strings, or C{None} to return whatever
the default is for string types.
@rtype: dict(str S{->} str)
@return: Dictionary of environment keys and values.
@raise WindowsError: On error an exception is raised.
"""
# Get the environment variables.
variables = self.get_environment_variables()
# Convert the strings to ANSI if requested.
if fUnicode is None:
gst = win32.GuessStringType
fUnicode = gst.t_default == gst.t_unicode
if not fUnicode:
variables = [ ( key.encode('cp1252'), value.encode('cp1252') ) \
for (key, value) in variables ]
# Add the variables to a dictionary, concatenating duplicates.
environment = dict()
for key, value in variables:
if key in environment:
environment[key] = environment[key] + u'\0' + value
else:
environment[key] = value
# Return the dictionary.
return environment
#------------------------------------------------------------------------------
def search(self, pattern, minAddr = None, maxAddr = None):
"""
Search for the given string or pattern within the process memory.
@note: This is a more limited version of L{Search.search_process}.
@type pattern: C{str} or L{Pattern}
@param pattern: String or hexadecimal pattern to search for.
@type minAddr: int
@param minAddr: (Optional) Start the search at this memory address.
@type maxAddr: int
@param maxAddr: (Optional) Stop the search at this memory address.
@rtype: iterator of tuple( int, str )
@return: An iterator of tuples. Each tuple contains the following:
- The memory address where the pattern was found.
- The data that matches the pattern.
@raise WindowsError: An error occurred when querying or reading the
process memory.
"""
if isinstance(pattern, str):
if HexInput.is_pattern(pattern):
return Search.search_process(
self, [pattern], minAddr, maxAddr)
return self.search_bytes(pattern, minAddr, maxAddr)
if isinstance(pattern, unicode):
return self.search_bytes(
pattern.encode("utf-16le"), minAddr, maxAddr)
if isinstance(pattern, Pattern):
return Search.search_process(
self, [pattern], minAddr, maxAddr)
raise TypeError("Unknown pattern type: %r" % type(pattern))
def search_bytes(self, bytes, minAddr = None, maxAddr = None):
"""
Search for the given byte pattern within the process memory.
@note: This is a more limited version of L{Search.search_process}.
@type bytes: str
@param bytes: Bytes to search for.
@type minAddr: int
@param minAddr: (Optional) Start the search at this memory address.
@type maxAddr: int
@param maxAddr: (Optional) Stop the search at this memory address.
@rtype: iterator of int
@return: An iterator of memory addresses where the pattern was found.
@raise WindowsError: An error occurred when querying or reading the
process memory.
"""
pattern = StringPattern(bytes)
return (x[0] for x in Search.search_process(
self, [pattern], minAddr, maxAddr))
def search_text(self, text, encoding = "utf-16le",
caseSensitive = False,
minAddr = None,
maxAddr = None):
"""
Search for the given text within the process memory.
@note: This is a more limited version of L{Search.search_process}.
@type text: str or unicode
@param text: Text to search for.
@type encoding: str
@param encoding: (Optional) Encoding for the text parameter.
Only used when the text to search for is a Unicode string.
Don't change unless you know what you're doing!
@type caseSensitive: bool
@param caseSensitive: C{True} of the search is case sensitive,
C{False} otherwise.
@type minAddr: int
@param minAddr: (Optional) Start the search at this memory address.
@type maxAddr: int
@param maxAddr: (Optional) Stop the search at this memory address.
@rtype: iterator of tuple( int, str )
@return: An iterator of tuples. Each tuple contains the following:
- The memory address where the pattern was found.
- The text that matches the pattern.
@raise WindowsError: An error occurred when querying or reading the
process memory.
"""
bytes = text.encode(encoding)
if caseSensitive:
pattern = StringPattern(bytes)
else:
pattern = IStringPattern(bytes)
return Search.search_process(self, [pattern], minAddr, maxAddr)
def search_hexa(self, hexa, minAddr = None, maxAddr = None):
"""
Search for the given hexadecimal pattern within the process memory.
@note: This is a more limited version of L{Search.search_process}.
Hex patterns must be in this form::
"68 65 6c 6c 6f 20 77 6f 72 6c 64" # "hello world"
Spaces are optional. Capitalization of hex digits doesn't matter.
This is exactly equivalent to the previous example::
"68656C6C6F20776F726C64" # "hello world"
Wildcards are allowed, in the form of a C{?} sign in any hex digit::
"5? 5? c3" # pop register / pop register / ret
"b8 ?? ?? ?? ??" # mov eax, immediate value
@type hexa: str
@param hexa: Pattern to search for.
@type minAddr: int
@param minAddr: (Optional) Start the search at this memory address.
@type maxAddr: int
@param maxAddr: (Optional) Stop the search at this memory address.
@rtype: iterator of tuple( int, str )
@return: An iterator of tuples. Each tuple contains the following:
- The memory address where the pattern was found.
- The bytes that match the pattern.
@raise WindowsError: An error occurred when querying or reading the
process memory.
"""
pattern = HexPattern(hexa)
return Search.search_process(self, [pattern], minAddr, maxAddr)
#------------------------------------------------------------------------------
def __read_c_type(self, address, format, c_type):
size = ctypes.sizeof(c_type)
packed = self.read(address, size)
if len(packed) != size:
raise ctypes.WinError()
return struct.unpack(format, packed)[0]
def __write_c_type(self, address, format, unpacked):
packed = struct.pack('@L', unpacked)
self.write(address, packed)
# XXX TODO
# + Maybe change page permissions before trying to read?
def read(self, lpBaseAddress, nSize):
"""
Reads from the memory of the process.
@see: L{peek}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@type nSize: int
@param nSize: Number of bytes to read.
@rtype: str
@return: Bytes read from the process memory.
@raise WindowsError: On error an exception is raised.
"""
hProcess = self.get_handle( win32.PROCESS_VM_READ |
win32.PROCESS_QUERY_INFORMATION )
if not self.is_buffer(lpBaseAddress, nSize):
raise ctypes.WinError(win32.ERROR_INVALID_ADDRESS)
data = win32.ReadProcessMemory(hProcess, lpBaseAddress, nSize)
if len(data) != nSize:
raise ctypes.WinError()
return data
def write(self, lpBaseAddress, lpBuffer):
"""
Writes to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{poke}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type lpBuffer: str
@param lpBuffer: Bytes to write.
@raise WindowsError: On error an exception is raised.
"""
r = self.poke(lpBaseAddress, lpBuffer)
if r != len(lpBuffer):
raise ctypes.WinError()
def read_char(self, lpBaseAddress):
"""
Reads a single character to the memory of the process.
@see: L{peek_char}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@rtype: int
@return: Character value read from the process memory.
@raise WindowsError: On error an exception is raised.
"""
return ord( self.read(lpBaseAddress, 1) )
def write_char(self, lpBaseAddress, char):
"""
Writes a single character to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{poke_char}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type char: int
@param char: Character to write.
@raise WindowsError: On error an exception is raised.
"""
self.write(lpBaseAddress, chr(char))
def read_int(self, lpBaseAddress):
"""
Reads a signed integer from the memory of the process.
@see: L{peek_int}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Integer value read from the process memory.
@raise WindowsError: On error an exception is raised.
"""
return self.__read_c_type(lpBaseAddress, '@l', ctypes.c_int)
def write_int(self, lpBaseAddress, unpackedValue):
"""
Writes a signed integer to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{poke_int}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type unpackedValue: int, long
@param unpackedValue: Value to write.
@raise WindowsError: On error an exception is raised.
"""
self.__write_c_type(lpBaseAddress, '@l', unpackedValue)
def read_uint(self, lpBaseAddress):
"""
Reads an unsigned integer from the memory of the process.
@see: L{peek_uint}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Integer value read from the process memory.
@raise WindowsError: On error an exception is raised.
"""
return self.__read_c_type(lpBaseAddress, '@L', ctypes.c_uint)
def write_uint(self, lpBaseAddress, unpackedValue):
"""
Writes an unsigned integer to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{poke_uint}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type unpackedValue: int, long
@param unpackedValue: Value to write.
@raise WindowsError: On error an exception is raised.
"""
self.__write_c_type(lpBaseAddress, '@L', unpackedValue)
def read_float(self, lpBaseAddress):
"""
Reads a float from the memory of the process.
@see: L{peek_float}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Floating point value read from the process memory.
@raise WindowsError: On error an exception is raised.
"""
return self.__read_c_type(lpBaseAddress, '@f', ctypes.c_float)
def write_float(self, lpBaseAddress, unpackedValue):
"""
Writes a float to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{poke_float}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type unpackedValue: int, long
@param unpackedValue: Floating point value to write.
@raise WindowsError: On error an exception is raised.
"""
self.__write_c_type(lpBaseAddress, '@f', unpackedValue)
def read_double(self, lpBaseAddress):
"""
Reads a double from the memory of the process.
@see: L{peek_double}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Floating point value read from the process memory.
@raise WindowsError: On error an exception is raised.
"""
return self.__read_c_type(lpBaseAddress, '@d', ctypes.c_double)
def write_double(self, lpBaseAddress, unpackedValue):
"""
Writes a double to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{poke_double}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type unpackedValue: int, long
@param unpackedValue: Floating point value to write.
@raise WindowsError: On error an exception is raised.
"""
self.__write_c_type(lpBaseAddress, '@d', unpackedValue)
def read_pointer(self, lpBaseAddress):
"""
Reads a pointer value from the memory of the process.
@see: L{peek_pointer}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Pointer value read from the process memory.
@raise WindowsError: On error an exception is raised.
"""
return self.__read_c_type(lpBaseAddress, '@P', ctypes.c_void_p)
def write_pointer(self, lpBaseAddress, unpackedValue):
"""
Writes a pointer value to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{poke_pointer}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type unpackedValue: int, long
@param unpackedValue: Value to write.
@raise WindowsError: On error an exception is raised.
"""
self.__write_c_type(lpBaseAddress, '@P', unpackedValue)
def read_dword(self, lpBaseAddress):
"""
Reads a DWORD from the memory of the process.
@see: L{peek_dword}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Integer value read from the process memory.
@raise WindowsError: On error an exception is raised.
"""
return self.__read_c_type(lpBaseAddress, '=L', win32.DWORD)
def write_dword(self, lpBaseAddress, unpackedValue):
"""
Writes a DWORD to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{poke_dword}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type unpackedValue: int, long
@param unpackedValue: Value to write.
@raise WindowsError: On error an exception is raised.
"""
self.__write_c_type(lpBaseAddress, '=L', unpackedValue)
def read_qword(self, lpBaseAddress):
"""
Reads a QWORD from the memory of the process.
@see: L{peek_qword}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Integer value read from the process memory.
@raise WindowsError: On error an exception is raised.
"""
return self.__read_c_type(lpBaseAddress, '=Q', win32.QWORD)
def write_qword(self, lpBaseAddress, unpackedValue):
"""
Writes a QWORD to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{poke_qword}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type unpackedValue: int, long
@param unpackedValue: Value to write.
@raise WindowsError: On error an exception is raised.
"""
self.__write_c_type(lpBaseAddress, '=Q', unpackedValue)
def read_structure(self, lpBaseAddress, stype):
"""
Reads a ctypes structure from the memory of the process.
@see: L{read}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@type stype: class ctypes.Structure or a subclass.
@param stype: Structure definition.
@rtype: int
@return: Structure instance filled in with data
read from the process memory.
@raise WindowsError: On error an exception is raised.
"""
if type(lpBaseAddress) not in (type(0), type(0L)):
lpBaseAddress = ctypes.cast(lpBaseAddress, ctypes.c_void_p)
data = self.read(lpBaseAddress, ctypes.sizeof(stype))
buff = ctypes.create_string_buffer(data)
ptr = ctypes.cast(ctypes.pointer(buff), ctypes.POINTER(stype))
return ptr.contents
# XXX TODO
## def write_structure(self, lpBaseAddress, sStructure):
## """
## Writes a ctypes structure into the memory of the process.
##
## @note: Page permissions may be changed temporarily while writing.
##
## @see: L{write}
##
## @type lpBaseAddress: int
## @param lpBaseAddress: Memory address to begin writing.
##
## @type sStructure: ctypes.Structure or a subclass' instance.
## @param sStructure: Structure definition.
##
## @rtype: int
## @return: Structure instance filled in with data
## read from the process memory.
##
## @raise WindowsError: On error an exception is raised.
## """
## size = ctypes.sizeof(sStructure)
## data = ctypes.create_string_buffer("", size = size)
## win32.CopyMemory(ctypes.byref(data), ctypes.byref(sStructure), size)
## self.write(lpBaseAddress, data.raw)
def read_string(self, lpBaseAddress, nChars, fUnicode = False):
"""
Reads an ASCII or Unicode string
from the address space of the process.
@see: L{peek_string}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@type nChars: int
@param nChars: String length to read, in characters.
Remember that Unicode strings have two byte characters.
@type fUnicode: bool
@param fUnicode: C{True} is the string is expected to be Unicode,
C{False} if it's expected to be ANSI.
@rtype: str, unicode
@return: String read from the process memory space.
@raise WindowsError: On error an exception is raised.
"""
if fUnicode:
nChars = nChars * 2
szString = self.read(lpBaseAddress, nChars)
if fUnicode:
szString = unicode(szString, 'U16', 'ignore')
return szString
#------------------------------------------------------------------------------
# FIXME this won't work properly with a different endianness!
def __peek_c_type(self, address, format, c_type):
size = ctypes.sizeof(c_type)
packed = self.peek(address, size)
if len(packed) < size:
packed = '\0' * (size - len(packed)) + packed
elif len(packed) > size:
packed = packed[:size]
return struct.unpack(format, packed)[0]
def __poke_c_type(self, address, format, unpacked):
packed = struct.pack('@L', unpacked)
return self.poke(address, packed)
def peek(self, lpBaseAddress, nSize):
"""
Reads the memory of the process.
@see: L{read}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@type nSize: int
@param nSize: Number of bytes to read.
@rtype: str
@return: Bytes read from the process memory.
Returns an empty string on error.
"""
# XXX TODO
# + Maybe change page permissions before trying to read?
# + Maybe use mquery instead of get_memory_map?
# (less syscalls if we break out of the loop earlier)
data = ''
if nSize > 0:
try:
hProcess = self.get_handle( win32.PROCESS_VM_READ |
win32.PROCESS_QUERY_INFORMATION )
for mbi in self.get_memory_map(lpBaseAddress,
lpBaseAddress + nSize):
if not mbi.is_readable():
nSize = mbi.BaseAddress - lpBaseAddress
break
if nSize > 0:
data = win32.ReadProcessMemory(
hProcess, lpBaseAddress, nSize)
except WindowsError, e:
msg = "Error reading process %d address %s: %s"
msg %= (self.get_pid(),
HexDump.address(lpBaseAddress),
e.strerror)
warnings.warn(msg)
return data
def poke(self, lpBaseAddress, lpBuffer):
"""
Writes to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{write}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type lpBuffer: str
@param lpBuffer: Bytes to write.
@rtype: int
@return: Number of bytes written.
May be less than the number of bytes to write.
"""
hProcess = self.get_handle( win32.PROCESS_VM_WRITE |
win32.PROCESS_VM_OPERATION |
win32.PROCESS_QUERY_INFORMATION )
mbi = self.mquery(lpBaseAddress)
if not mbi.has_content():
raise ctypes.WinError(win32.ERROR_INVALID_ADDRESS)
if mbi.is_image() or mbi.is_mapped():
prot = win32.PAGE_WRITECOPY
elif mbi.is_writeable():
prot = None
elif mbi.is_executable():
prot = win32.PAGE_EXECUTE_READWRITE
else:
prot = win32.PAGE_READWRITE
if prot is not None:
try:
self.mprotect(lpBaseAddress, len(lpBuffer), prot)
except Exception:
prot = None
msg = ("Failed to adjust page permissions"
" for process %s at address %s: %s")
msg = msg % (self.get_pid(),
HexDump.address(lpBaseAddress, self.get_bits()),
traceback.format_exc())
warnings.warn(msg, RuntimeWarning)
try:
r = win32.WriteProcessMemory(hProcess, lpBaseAddress, lpBuffer)
finally:
if prot is not None:
self.mprotect(lpBaseAddress, len(lpBuffer), mbi.Protect)
return r
def peek_char(self, lpBaseAddress):
"""
Reads a single character from the memory of the process.
@see: L{read_char}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Character read from the process memory.
Returns zero on error.
"""
char = self.peek(lpBaseAddress, 1)
if char:
return ord(char)
return 0
def poke_char(self, lpBaseAddress, char):
"""
Writes a single character to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{write_char}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type char: str
@param char: Character to write.
@rtype: int
@return: Number of bytes written.
May be less than the number of bytes to write.
"""
return self.poke(lpBaseAddress, chr(char))
def peek_int(self, lpBaseAddress):
"""
Reads a signed integer from the memory of the process.
@see: L{read_int}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Integer value read from the process memory.
Returns zero on error.
"""
return self.__peek_c_type(lpBaseAddress, '@l', ctypes.c_int)
def poke_int(self, lpBaseAddress, unpackedValue):
"""
Writes a signed integer to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{write_int}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type unpackedValue: int, long
@param unpackedValue: Value to write.
@rtype: int
@return: Number of bytes written.
May be less than the number of bytes to write.
"""
return self.__poke_c_type(lpBaseAddress, '@l', unpackedValue)
def peek_uint(self, lpBaseAddress):
"""
Reads an unsigned integer from the memory of the process.
@see: L{read_uint}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Integer value read from the process memory.
Returns zero on error.
"""
return self.__peek_c_type(lpBaseAddress, '@L', ctypes.c_uint)
def poke_uint(self, lpBaseAddress, unpackedValue):
"""
Writes an unsigned integer to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{write_uint}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type unpackedValue: int, long
@param unpackedValue: Value to write.
@rtype: int
@return: Number of bytes written.
May be less than the number of bytes to write.
"""
return self.__poke_c_type(lpBaseAddress, '@L', unpackedValue)
def peek_float(self, lpBaseAddress):
"""
Reads a float from the memory of the process.
@see: L{read_float}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Integer value read from the process memory.
Returns zero on error.
"""
return self.__peek_c_type(lpBaseAddress, '@f', ctypes.c_float)
def poke_float(self, lpBaseAddress, unpackedValue):
"""
Writes a float to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{write_float}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type unpackedValue: int, long
@param unpackedValue: Value to write.
@rtype: int
@return: Number of bytes written.
May be less than the number of bytes to write.
"""
return self.__poke_c_type(lpBaseAddress, '@f', unpackedValue)
def peek_double(self, lpBaseAddress):
"""
Reads a double from the memory of the process.
@see: L{read_double}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Integer value read from the process memory.
Returns zero on error.
"""
return self.__peek_c_type(lpBaseAddress, '@d', ctypes.c_double)
def poke_double(self, lpBaseAddress, unpackedValue):
"""
Writes a double to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{write_double}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type unpackedValue: int, long
@param unpackedValue: Value to write.
@rtype: int
@return: Number of bytes written.
May be less than the number of bytes to write.
"""
return self.__poke_c_type(lpBaseAddress, '@d', unpackedValue)
def peek_dword(self, lpBaseAddress):
"""
Reads a DWORD from the memory of the process.
@see: L{read_dword}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Integer value read from the process memory.
Returns zero on error.
"""
return self.__peek_c_type(lpBaseAddress, '=L', win32.DWORD)
def poke_dword(self, lpBaseAddress, unpackedValue):
"""
Writes a DWORD to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{write_dword}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type unpackedValue: int, long
@param unpackedValue: Value to write.
@rtype: int
@return: Number of bytes written.
May be less than the number of bytes to write.
"""
return self.__poke_c_type(lpBaseAddress, '=L', unpackedValue)
def peek_qword(self, lpBaseAddress):
"""
Reads a QWORD from the memory of the process.
@see: L{read_qword}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Integer value read from the process memory.
Returns zero on error.
"""
return self.__peek_c_type(lpBaseAddress, '=Q', win32.QWORD)
def poke_qword(self, lpBaseAddress, unpackedValue):
"""
Writes a QWORD to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{write_qword}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type unpackedValue: int, long
@param unpackedValue: Value to write.
@rtype: int
@return: Number of bytes written.
May be less than the number of bytes to write.
"""
return self.__poke_c_type(lpBaseAddress, '=Q', unpackedValue)
def peek_pointer(self, lpBaseAddress):
"""
Reads a pointer value from the memory of the process.
@see: L{read_pointer}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@rtype: int
@return: Pointer value read from the process memory.
Returns zero on error.
"""
return self.__peek_c_type(lpBaseAddress, '@P', ctypes.c_void_p)
def poke_pointer(self, lpBaseAddress, unpackedValue):
"""
Writes a pointer value to the memory of the process.
@note: Page permissions may be changed temporarily while writing.
@see: L{write_pointer}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin writing.
@type unpackedValue: int, long
@param unpackedValue: Value to write.
@rtype: int
@return: Number of bytes written.
May be less than the number of bytes to write.
"""
return self.__poke_c_type(lpBaseAddress, '@P', unpackedValue)
def peek_string(self, lpBaseAddress, fUnicode = False, dwMaxSize = 0x1000):
"""
Tries to read an ASCII or Unicode string
from the address space of the process.
@see: L{read_string}
@type lpBaseAddress: int
@param lpBaseAddress: Memory address to begin reading.
@type fUnicode: bool
@param fUnicode: C{True} is the string is expected to be Unicode,
C{False} if it's expected to be ANSI.
@type dwMaxSize: int
@param dwMaxSize: Maximum allowed string length to read, in bytes.
@rtype: str, unicode
@return: String read from the process memory space.
It B{doesn't} include the terminating null character.
Returns an empty string on failure.
"""
# Validate the parameters.
if not lpBaseAddress or dwMaxSize == 0:
if fUnicode:
return u''
return ''
if not dwMaxSize:
dwMaxSize = 0x1000
# Read the string.
szString = self.peek(lpBaseAddress, dwMaxSize)
# If the string is Unicode...
if fUnicode:
# Decode the string.
szString = unicode(szString, 'U16', 'replace')
## try:
## szString = unicode(szString, 'U16')
## except UnicodeDecodeError:
## szString = struct.unpack('H' * (len(szString) / 2), szString)
## szString = [ unichr(c) for c in szString ]
## szString = u''.join(szString)
# Truncate the string when the first null char is found.
szString = szString[ : szString.find(u'\0') ]
# If the string is ANSI...
else:
# Truncate the string when the first null char is found.
szString = szString[ : szString.find('\0') ]
# Return the decoded string.
return szString
# TODO
# try to avoid reading the same page twice by caching it
def peek_pointers_in_data(self, data, peekSize = 16, peekStep = 1):
"""
Tries to guess which values in the given data are valid pointers,
and reads some data from them.
@see: L{peek}
@type data: str
@param data: Binary data to find pointers in.
@type peekSize: int
@param peekSize: Number of bytes to read from each pointer found.
@type peekStep: int
@param peekStep: Expected data alignment.
Typically you specify 1 when data alignment is unknown,
or 4 when you expect data to be DWORD aligned.
Any other value may be specified.
@rtype: dict( str S{->} str )
@return: Dictionary mapping stack offsets to the data they point to.
"""
result = dict()
ptrSize = win32.sizeof(win32.LPVOID)
if ptrSize == 4:
ptrFmt = '<L'
else:
ptrFmt = '<Q'
if len(data) > 0:
for i in xrange(0, len(data), peekStep):
packed = data[i:i+ptrSize]
if len(packed) == ptrSize:
address = struct.unpack(ptrFmt, packed)[0]
## if not address & (~0xFFFF): continue
peek_data = self.peek(address, peekSize)
if peek_data:
result[i] = peek_data
return result
#------------------------------------------------------------------------------
def malloc(self, dwSize, lpAddress = None):
"""
Allocates memory into the address space of the process.
@see: L{free}
@type dwSize: int
@param dwSize: Number of bytes to allocate.
@type lpAddress: int
@param lpAddress: (Optional)
Desired address for the newly allocated memory.
This is only a hint, the memory could still be allocated somewhere
else.
@rtype: int
@return: Address of the newly allocated memory.
@raise WindowsError: On error an exception is raised.
"""
hProcess = self.get_handle(win32.PROCESS_VM_OPERATION)
return win32.VirtualAllocEx(hProcess, lpAddress, dwSize)
def mprotect(self, lpAddress, dwSize, flNewProtect):
"""
Set memory protection in the address space of the process.
@see: U{http://msdn.microsoft.com/en-us/library/aa366899.aspx}
@type lpAddress: int
@param lpAddress: Address of memory to protect.
@type dwSize: int
@param dwSize: Number of bytes to protect.
@type flNewProtect: int
@param flNewProtect: New protect flags.
@rtype: int
@return: Old protect flags.
@raise WindowsError: On error an exception is raised.
"""
hProcess = self.get_handle(win32.PROCESS_VM_OPERATION)
return win32.VirtualProtectEx(hProcess, lpAddress, dwSize, flNewProtect)
def mquery(self, lpAddress):
"""
Query memory information from the address space of the process.
Returns a L{win32.MemoryBasicInformation} object.
@see: U{http://msdn.microsoft.com/en-us/library/aa366907(VS.85).aspx}
@type lpAddress: int
@param lpAddress: Address of memory to query.
@rtype: L{win32.MemoryBasicInformation}
@return: Memory region information.
@raise WindowsError: On error an exception is raised.
"""
hProcess = self.get_handle(win32.PROCESS_QUERY_INFORMATION)
return win32.VirtualQueryEx(hProcess, lpAddress)
def free(self, lpAddress):
"""
Frees memory from the address space of the process.
@see: U{http://msdn.microsoft.com/en-us/library/aa366894(v=vs.85).aspx}
@type lpAddress: int
@param lpAddress: Address of memory to free.
Must be the base address returned by L{malloc}.
@raise WindowsError: On error an exception is raised.
"""
hProcess = self.get_handle(win32.PROCESS_VM_OPERATION)
win32.VirtualFreeEx(hProcess, lpAddress)
#------------------------------------------------------------------------------
def is_pointer(self, address):
"""
Determines if an address is a valid code or data pointer.
That is, the address must be valid and must point to code or data in
the target process.
@type address: int
@param address: Memory address to query.
@rtype: bool
@return: C{True} if the address is a valid code or data pointer.
@raise WindowsError: An exception is raised on error.
"""
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
return mbi.has_content()
def is_address_valid(self, address):
"""
Determines if an address is a valid user mode address.
@type address: int
@param address: Memory address to query.
@rtype: bool
@return: C{True} if the address is a valid user mode address.
@raise WindowsError: An exception is raised on error.
"""
try:
self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
return True
def is_address_free(self, address):
"""
Determines if an address belongs to a free page.
@note: Returns always C{False} for kernel mode addresses.
@type address: int
@param address: Memory address to query.
@rtype: bool
@return: C{True} if the address belongs to a free page.
@raise WindowsError: An exception is raised on error.
"""
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
return mbi.is_free()
def is_address_reserved(self, address):
"""
Determines if an address belongs to a reserved page.
@note: Returns always C{False} for kernel mode addresses.
@type address: int
@param address: Memory address to query.
@rtype: bool
@return: C{True} if the address belongs to a reserved page.
@raise WindowsError: An exception is raised on error.
"""
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
return mbi.is_reserved()
def is_address_commited(self, address):
"""
Determines if an address belongs to a commited page.
@note: Returns always C{False} for kernel mode addresses.
@type address: int
@param address: Memory address to query.
@rtype: bool
@return: C{True} if the address belongs to a commited page.
@raise WindowsError: An exception is raised on error.
"""
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
return mbi.is_commited()
def is_address_guard(self, address):
"""
Determines if an address belongs to a guard page.
@note: Returns always C{False} for kernel mode addresses.
@type address: int
@param address: Memory address to query.
@rtype: bool
@return: C{True} if the address belongs to a guard page.
@raise WindowsError: An exception is raised on error.
"""
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
return mbi.is_guard()
def is_address_readable(self, address):
"""
Determines if an address belongs to a commited and readable page.
The page may or may not have additional permissions.
@note: Returns always C{False} for kernel mode addresses.
@type address: int
@param address: Memory address to query.
@rtype: bool
@return:
C{True} if the address belongs to a commited and readable page.
@raise WindowsError: An exception is raised on error.
"""
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
return mbi.is_readable()
def is_address_writeable(self, address):
"""
Determines if an address belongs to a commited and writeable page.
The page may or may not have additional permissions.
@note: Returns always C{False} for kernel mode addresses.
@type address: int
@param address: Memory address to query.
@rtype: bool
@return:
C{True} if the address belongs to a commited and writeable page.
@raise WindowsError: An exception is raised on error.
"""
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
return mbi.is_writeable()
def is_address_copy_on_write(self, address):
"""
Determines if an address belongs to a commited, copy-on-write page.
The page may or may not have additional permissions.
@note: Returns always C{False} for kernel mode addresses.
@type address: int
@param address: Memory address to query.
@rtype: bool
@return:
C{True} if the address belongs to a commited, copy-on-write page.
@raise WindowsError: An exception is raised on error.
"""
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
return mbi.is_copy_on_write()
def is_address_executable(self, address):
"""
Determines if an address belongs to a commited and executable page.
The page may or may not have additional permissions.
@note: Returns always C{False} for kernel mode addresses.
@type address: int
@param address: Memory address to query.
@rtype: bool
@return:
C{True} if the address belongs to a commited and executable page.
@raise WindowsError: An exception is raised on error.
"""
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
return mbi.is_executable()
def is_address_executable_and_writeable(self, address):
"""
Determines if an address belongs to a commited, writeable and
executable page. The page may or may not have additional permissions.
Looking for writeable and executable pages is important when
exploiting a software vulnerability.
@note: Returns always C{False} for kernel mode addresses.
@type address: int
@param address: Memory address to query.
@rtype: bool
@return:
C{True} if the address belongs to a commited, writeable and
executable page.
@raise WindowsError: An exception is raised on error.
"""
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
return mbi.is_executable_and_writeable()
def is_buffer(self, address, size):
"""
Determines if the given memory area is a valid code or data buffer.
@note: Returns always C{False} for kernel mode addresses.
@see: L{mquery}
@type address: int
@param address: Memory address.
@type size: int
@param size: Number of bytes. Must be greater than zero.
@rtype: bool
@return: C{True} if the memory area is a valid code or data buffer,
C{False} otherwise.
@raise ValueError: The size argument must be greater than zero.
@raise WindowsError: On error an exception is raised.
"""
if size <= 0:
raise ValueError("The size argument must be greater than zero")
while size > 0:
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
if not mbi.has_content():
return False
size = size - mbi.RegionSize
return True
def is_buffer_readable(self, address, size):
"""
Determines if the given memory area is readable.
@note: Returns always C{False} for kernel mode addresses.
@see: L{mquery}
@type address: int
@param address: Memory address.
@type size: int
@param size: Number of bytes. Must be greater than zero.
@rtype: bool
@return: C{True} if the memory area is readable, C{False} otherwise.
@raise ValueError: The size argument must be greater than zero.
@raise WindowsError: On error an exception is raised.
"""
if size <= 0:
raise ValueError("The size argument must be greater than zero")
while size > 0:
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
if not mbi.is_readable():
return False
size = size - mbi.RegionSize
return True
def is_buffer_writeable(self, address, size):
"""
Determines if the given memory area is writeable.
@note: Returns always C{False} for kernel mode addresses.
@see: L{mquery}
@type address: int
@param address: Memory address.
@type size: int
@param size: Number of bytes. Must be greater than zero.
@rtype: bool
@return: C{True} if the memory area is writeable, C{False} otherwise.
@raise ValueError: The size argument must be greater than zero.
@raise WindowsError: On error an exception is raised.
"""
if size <= 0:
raise ValueError("The size argument must be greater than zero")
while size > 0:
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
if not mbi.is_writeable():
return False
size = size - mbi.RegionSize
return True
def is_buffer_copy_on_write(self, address, size):
"""
Determines if the given memory area is marked as copy-on-write.
@note: Returns always C{False} for kernel mode addresses.
@see: L{mquery}
@type address: int
@param address: Memory address.
@type size: int
@param size: Number of bytes. Must be greater than zero.
@rtype: bool
@return: C{True} if the memory area is marked as copy-on-write,
C{False} otherwise.
@raise ValueError: The size argument must be greater than zero.
@raise WindowsError: On error an exception is raised.
"""
if size <= 0:
raise ValueError("The size argument must be greater than zero")
while size > 0:
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
if not mbi.is_copy_on_write():
return False
size = size - mbi.RegionSize
return True
def is_buffer_executable(self, address, size):
"""
Determines if the given memory area is executable.
@note: Returns always C{False} for kernel mode addresses.
@see: L{mquery}
@type address: int
@param address: Memory address.
@type size: int
@param size: Number of bytes. Must be greater than zero.
@rtype: bool
@return: C{True} if the memory area is executable, C{False} otherwise.
@raise ValueError: The size argument must be greater than zero.
@raise WindowsError: On error an exception is raised.
"""
if size <= 0:
raise ValueError("The size argument must be greater than zero")
while size > 0:
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
if not mbi.is_executable():
return False
size = size - mbi.RegionSize
return True
def is_buffer_executable_and_writeable(self, address, size):
"""
Determines if the given memory area is writeable and executable.
Looking for writeable and executable pages is important when
exploiting a software vulnerability.
@note: Returns always C{False} for kernel mode addresses.
@see: L{mquery}
@type address: int
@param address: Memory address.
@type size: int
@param size: Number of bytes. Must be greater than zero.
@rtype: bool
@return: C{True} if the memory area is writeable and executable,
C{False} otherwise.
@raise ValueError: The size argument must be greater than zero.
@raise WindowsError: On error an exception is raised.
"""
if size <= 0:
raise ValueError("The size argument must be greater than zero")
while size > 0:
try:
mbi = self.mquery(address)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
return False
raise
if not mbi.is_executable():
return False
size = size - mbi.RegionSize
return True
def get_memory_map(self, minAddr = None, maxAddr = None):
"""
Produces a memory map to the process address space.
Optionally restrict the map to the given address range.
@see: L{mquery}
@type minAddr: int
@param minAddr: (Optional) Starting address in address range to query.
@type maxAddr: int
@param maxAddr: (Optional) Ending address in address range to query.
@rtype: list( L{win32.MemoryBasicInformation} )
@return: List of memory region information objects.
"""
return list(self.iter_memory_map(minAddr, maxAddr))
def generate_memory_map(self, minAddr = None, maxAddr = None):
"""
Returns a L{Regenerator} that can iterate indefinitely over the memory
map to the process address space.
Optionally restrict the map to the given address range.
@see: L{mquery}
@type minAddr: int
@param minAddr: (Optional) Starting address in address range to query.
@type maxAddr: int
@param maxAddr: (Optional) Ending address in address range to query.
@rtype: L{Regenerator} of L{win32.MemoryBasicInformation}
@return: List of memory region information objects.
"""
return Regenerator(self.iter_memory_map, minAddr, maxAddr)
def iter_memory_map(self, minAddr = None, maxAddr = None):
"""
Produces an iterator over the memory map to the process address space.
Optionally restrict the map to the given address range.
@see: L{mquery}
@type minAddr: int
@param minAddr: (Optional) Starting address in address range to query.
@type maxAddr: int
@param maxAddr: (Optional) Ending address in address range to query.
@rtype: iterator of L{win32.MemoryBasicInformation}
@return: List of memory region information objects.
"""
minAddr, maxAddr = MemoryAddresses.align_address_range(minAddr,maxAddr)
prevAddr = minAddr - 1
currentAddr = minAddr
while prevAddr < currentAddr < maxAddr:
try:
mbi = self.mquery(currentAddr)
except WindowsError, e:
if e.winerror == win32.ERROR_INVALID_PARAMETER:
break
raise
yield mbi
prevAddr = currentAddr
currentAddr = mbi.BaseAddress + mbi.RegionSize
def get_mapped_filenames(self, memoryMap = None):
"""
Retrieves the filenames for memory mapped files in the debugee.
@type memoryMap: list( L{win32.MemoryBasicInformation} )
@param memoryMap: (Optional) Memory map returned by L{get_memory_map}.
If not given, the current memory map is used.
@rtype: dict( int S{->} str )
@return: Dictionary mapping memory addresses to file names.
Native filenames are converted to Win32 filenames when possible.
"""
hProcess = self.get_handle( win32.PROCESS_VM_READ |
win32.PROCESS_QUERY_INFORMATION )
if not memoryMap:
memoryMap = self.get_memory_map()
mappedFilenames = dict()
for mbi in memoryMap:
if mbi.Type not in (win32.MEM_IMAGE, win32.MEM_MAPPED):
continue
baseAddress = mbi.BaseAddress
fileName = ""
try:
fileName = win32.GetMappedFileName(hProcess, baseAddress)
fileName = PathOperations.native_to_win32_pathname(fileName)
except WindowsError, e: # NOQA
#try:
# msg = "Can't get mapped file name at address %s in process " \
# "%d, reason: %s" % (HexDump.address(baseAddress),
# self.get_pid(),
# e.strerror)
# warnings.warn(msg, Warning)
#except Exception:
pass
mappedFilenames[baseAddress] = fileName
return mappedFilenames
def generate_memory_snapshot(self, minAddr = None, maxAddr = None):
"""
Returns a L{Regenerator} that allows you to iterate through the memory
contents of a process indefinitely.
It's basically the same as the L{take_memory_snapshot} method, but it
takes the snapshot of each memory region as it goes, as opposed to
taking the whole snapshot at once. This allows you to work with very
large snapshots without a significant performance penalty.
Example::
# Print the memory contents of a process.
process.suspend()
try:
snapshot = process.generate_memory_snapshot()
for mbi in snapshot:
print HexDump.hexblock(mbi.content, mbi.BaseAddress)
finally:
process.resume()
The downside of this is the process must remain suspended while
iterating the snapshot, otherwise strange things may happen.
The snapshot can be iterated more than once. Each time it's iterated
the memory contents of the process will be fetched again.
You can also iterate the memory of a dead process, just as long as the
last open handle to it hasn't been closed.
@see: L{take_memory_snapshot}
@type minAddr: int
@param minAddr: (Optional) Starting address in address range to query.
@type maxAddr: int
@param maxAddr: (Optional) Ending address in address range to query.
@rtype: L{Regenerator} of L{win32.MemoryBasicInformation}
@return: Generator that when iterated returns memory region information
objects. Two extra properties are added to these objects:
- C{filename}: Mapped filename, or C{None}.
- C{content}: Memory contents, or C{None}.
"""
return Regenerator(self.iter_memory_snapshot, minAddr, maxAddr)
def iter_memory_snapshot(self, minAddr = None, maxAddr = None):
"""
Returns an iterator that allows you to go through the memory contents
of a process.
It's basically the same as the L{take_memory_snapshot} method, but it
takes the snapshot of each memory region as it goes, as opposed to
taking the whole snapshot at once. This allows you to work with very
large snapshots without a significant performance penalty.
Example::
# Print the memory contents of a process.
process.suspend()
try:
snapshot = process.generate_memory_snapshot()
for mbi in snapshot:
print HexDump.hexblock(mbi.content, mbi.BaseAddress)
finally:
process.resume()
The downside of this is the process must remain suspended while
iterating the snapshot, otherwise strange things may happen.
The snapshot can only iterated once. To be able to iterate indefinitely
call the L{generate_memory_snapshot} method instead.
You can also iterate the memory of a dead process, just as long as the
last open handle to it hasn't been closed.
@see: L{take_memory_snapshot}
@type minAddr: int
@param minAddr: (Optional) Starting address in address range to query.
@type maxAddr: int
@param maxAddr: (Optional) Ending address in address range to query.
@rtype: iterator of L{win32.MemoryBasicInformation}
@return: Iterator of memory region information objects.
Two extra properties are added to these objects:
- C{filename}: Mapped filename, or C{None}.
- C{content}: Memory contents, or C{None}.
"""
# One may feel tempted to include calls to self.suspend() and
# self.resume() here, but that wouldn't work on a dead process.
# It also wouldn't be needed when debugging since the process is
# already suspended when the debug event arrives. So it's up to
# the user to suspend the process if needed.
# Get the memory map.
memory = self.get_memory_map(minAddr, maxAddr)
# Abort if the map couldn't be retrieved.
if not memory:
return
# Get the mapped filenames.
# Don't fail on access denied errors.
try:
filenames = self.get_mapped_filenames(memory)
except WindowsError, e:
if e.winerror != win32.ERROR_ACCESS_DENIED:
raise
filenames = dict()
# Trim the first memory information block if needed.
if minAddr is not None:
minAddr = MemoryAddresses.align_address_to_page_start(minAddr)
mbi = memory[0]
if mbi.BaseAddress < minAddr:
mbi.RegionSize = mbi.BaseAddress + mbi.RegionSize - minAddr
mbi.BaseAddress = minAddr
# Trim the last memory information block if needed.
if maxAddr is not None:
if maxAddr != MemoryAddresses.align_address_to_page_start(maxAddr):
maxAddr = MemoryAddresses.align_address_to_page_end(maxAddr)
mbi = memory[-1]
if mbi.BaseAddress + mbi.RegionSize > maxAddr:
mbi.RegionSize = maxAddr - mbi.BaseAddress
# Read the contents of each block and yield it.
while memory:
mbi = memory.pop(0) # so the garbage collector can take it
mbi.filename = filenames.get(mbi.BaseAddress, None)
if mbi.has_content():
mbi.content = self.read(mbi.BaseAddress, mbi.RegionSize)
else:
mbi.content = None
yield mbi
def take_memory_snapshot(self, minAddr = None, maxAddr = None):
"""
Takes a snapshot of the memory contents of the process.
It's best if the process is suspended (if alive) when taking the
snapshot. Execution can be resumed afterwards.
Example::
# Print the memory contents of a process.
process.suspend()
try:
snapshot = process.take_memory_snapshot()
for mbi in snapshot:
print HexDump.hexblock(mbi.content, mbi.BaseAddress)
finally:
process.resume()
You can also iterate the memory of a dead process, just as long as the
last open handle to it hasn't been closed.
@warning: If the target process has a very big memory footprint, the
resulting snapshot will be equally big. This may result in a severe
performance penalty.
@see: L{generate_memory_snapshot}
@type minAddr: int
@param minAddr: (Optional) Starting address in address range to query.
@type maxAddr: int
@param maxAddr: (Optional) Ending address in address range to query.
@rtype: list( L{win32.MemoryBasicInformation} )
@return: List of memory region information objects.
Two extra properties are added to these objects:
- C{filename}: Mapped filename, or C{None}.
- C{content}: Memory contents, or C{None}.
"""
return list( self.iter_memory_snapshot(minAddr, maxAddr) )
def restore_memory_snapshot(self, snapshot,
bSkipMappedFiles = True,
bSkipOnError = False):
"""
Attempts to restore the memory state as it was when the given snapshot
was taken.
@warning: Currently only the memory contents, state and protect bits
are restored. Under some circumstances this method may fail (for
example if memory was freed and then reused by a mapped file).
@type snapshot: list( L{win32.MemoryBasicInformation} )
@param snapshot: Memory snapshot returned by L{take_memory_snapshot}.
Snapshots returned by L{generate_memory_snapshot} don't work here.
@type bSkipMappedFiles: bool
@param bSkipMappedFiles: C{True} to avoid restoring the contents of
memory mapped files, C{False} otherwise. Use with care! Setting
this to C{False} can cause undesired side effects - changes to
memory mapped files may be written to disk by the OS. Also note
that most mapped files are typically executables and don't change,
so trying to restore their contents is usually a waste of time.
@type bSkipOnError: bool
@param bSkipOnError: C{True} to issue a warning when an error occurs
during the restoration of the snapshot, C{False} to stop and raise
an exception instead. Use with care! Setting this to C{True} will
cause the debugger to falsely believe the memory snapshot has been
correctly restored.
@raise WindowsError: An error occured while restoring the snapshot.
@raise RuntimeError: An error occured while restoring the snapshot.
@raise TypeError: A snapshot of the wrong type was passed.
"""
if not snapshot or not isinstance(snapshot, list) \
or not isinstance(snapshot[0], win32.MemoryBasicInformation):
raise TypeError( "Only snapshots returned by " \
"take_memory_snapshot() can be used here." )
# Get the process handle.
hProcess = self.get_handle( win32.PROCESS_VM_WRITE |
win32.PROCESS_VM_OPERATION |
win32.PROCESS_SUSPEND_RESUME |
win32.PROCESS_QUERY_INFORMATION )
# Freeze the process.
self.suspend()
try:
# For each memory region in the snapshot...
for old_mbi in snapshot:
# If the region matches, restore it directly.
new_mbi = self.mquery(old_mbi.BaseAddress)
if new_mbi.BaseAddress == old_mbi.BaseAddress and \
new_mbi.RegionSize == old_mbi.RegionSize:
self.__restore_mbi(hProcess, new_mbi, old_mbi,
bSkipMappedFiles, bSkipOnError)
# If the region doesn't match, restore it page by page.
else:
# We need a copy so we don't corrupt the snapshot.
old_mbi = win32.MemoryBasicInformation(old_mbi)
# Get the overlapping range of pages.
old_start = old_mbi.BaseAddress
old_end = old_start + old_mbi.RegionSize
new_start = new_mbi.BaseAddress
new_end = new_start + new_mbi.RegionSize
if old_start > new_start:
start = old_start
else:
start = new_start
if old_end < new_end:
end = old_end
else:
end = new_end
# Restore each page in the overlapping range.
step = MemoryAddresses.pageSize
old_mbi.RegionSize = step
new_mbi.RegionSize = step
address = start
while address < end:
old_mbi.BaseAddress = address
new_mbi.BaseAddress = address
self.__restore_mbi(hProcess, new_mbi, old_mbi,
bSkipMappedFiles, bSkipOnError)
address = address + step
# Resume execution.
finally:
self.resume()
def __restore_mbi(self, hProcess, new_mbi, old_mbi, bSkipMappedFiles,
bSkipOnError):
"""
Used internally by L{restore_memory_snapshot}.
"""
## print "Restoring %s-%s" % (
## HexDump.address(old_mbi.BaseAddress, self.get_bits()),
## HexDump.address(old_mbi.BaseAddress + old_mbi.RegionSize,
## self.get_bits()))
try:
# Restore the region state.
if new_mbi.State != old_mbi.State:
if new_mbi.is_free():
if old_mbi.is_reserved():
# Free -> Reserved
address = win32.VirtualAllocEx(hProcess,
old_mbi.BaseAddress,
old_mbi.RegionSize,
win32.MEM_RESERVE,
old_mbi.Protect)
if address != old_mbi.BaseAddress:
self.free(address)
msg = "Error restoring region at address %s"
msg = msg % HexDump(old_mbi.BaseAddress,
self.get_bits())
raise RuntimeError(msg)
# permissions already restored
new_mbi.Protect = old_mbi.Protect
else: # elif old_mbi.is_commited():
# Free -> Commited
address = win32.VirtualAllocEx(hProcess,
old_mbi.BaseAddress,
old_mbi.RegionSize,
win32.MEM_RESERVE | \
win32.MEM_COMMIT,
old_mbi.Protect)
if address != old_mbi.BaseAddress:
self.free(address)
msg = "Error restoring region at address %s"
msg = msg % HexDump(old_mbi.BaseAddress,
self.get_bits())
raise RuntimeError(msg)
# permissions already restored
new_mbi.Protect = old_mbi.Protect
elif new_mbi.is_reserved():
if old_mbi.is_commited():
# Reserved -> Commited
address = win32.VirtualAllocEx(hProcess,
old_mbi.BaseAddress,
old_mbi.RegionSize,
win32.MEM_COMMIT,
old_mbi.Protect)
if address != old_mbi.BaseAddress:
self.free(address)
msg = "Error restoring region at address %s"
msg = msg % HexDump(old_mbi.BaseAddress,
self.get_bits())
raise RuntimeError(msg)
# permissions already restored
new_mbi.Protect = old_mbi.Protect
else: # elif old_mbi.is_free():
# Reserved -> Free
win32.VirtualFreeEx(hProcess,
old_mbi.BaseAddress,
old_mbi.RegionSize,
win32.MEM_RELEASE)
else: # elif new_mbi.is_commited():
if old_mbi.is_reserved():
# Commited -> Reserved
win32.VirtualFreeEx(hProcess,
old_mbi.BaseAddress,
old_mbi.RegionSize,
win32.MEM_DECOMMIT)
else: # elif old_mbi.is_free():
# Commited -> Free
win32.VirtualFreeEx(hProcess,
old_mbi.BaseAddress,
old_mbi.RegionSize,
win32.MEM_DECOMMIT | win32.MEM_RELEASE)
new_mbi.State = old_mbi.State
# Restore the region permissions.
if old_mbi.is_commited() and old_mbi.Protect != new_mbi.Protect:
win32.VirtualProtectEx(hProcess, old_mbi.BaseAddress,
old_mbi.RegionSize, old_mbi.Protect)
new_mbi.Protect = old_mbi.Protect
# Restore the region data.
# Ignore write errors when the region belongs to a mapped file.
if old_mbi.has_content():
if old_mbi.Type != 0:
if not bSkipMappedFiles:
self.poke(old_mbi.BaseAddress, old_mbi.content)
else:
self.write(old_mbi.BaseAddress, old_mbi.content)
new_mbi.content = old_mbi.content
# On error, skip this region or raise an exception.
except Exception:
if not bSkipOnError:
raise
msg = "Error restoring region at address %s: %s"
msg = msg % (
HexDump(old_mbi.BaseAddress, self.get_bits()),
traceback.format_exc())
warnings.warn(msg, RuntimeWarning)
#------------------------------------------------------------------------------
def inject_code(self, payload, lpParameter = 0):
"""
Injects relocatable code into the process memory and executes it.
@warning: Don't forget to free the memory when you're done with it!
Otherwise you'll be leaking memory in the target process.
@see: L{inject_dll}
@type payload: str
@param payload: Relocatable code to run in a new thread.
@type lpParameter: int
@param lpParameter: (Optional) Parameter to be pushed in the stack.
@rtype: tuple( L{Thread}, int )
@return: The injected Thread object
and the memory address where the code was written.
@raise WindowsError: An exception is raised on error.
"""
# Uncomment for debugging...
## payload = '\xCC' + payload
# Allocate the memory for the shellcode.
lpStartAddress = self.malloc(len(payload))
# Catch exceptions so we can free the memory on error.
try:
# Write the shellcode to our memory location.
self.write(lpStartAddress, payload)
# Start a new thread for the shellcode to run.
aThread = self.start_thread(lpStartAddress, lpParameter,
bSuspended = False)
# Remember the shellcode address.
# It will be freed ONLY by the Thread.kill() method
# and the EventHandler class, otherwise you'll have to
# free it in your code, or have your shellcode clean up
# after itself (recommended).
aThread.pInjectedMemory = lpStartAddress
# Free the memory on error.
except Exception:
self.free(lpStartAddress)
raise
# Return the Thread object and the shellcode address.
return aThread, lpStartAddress
# TODO
# The shellcode should check for errors, otherwise it just crashes
# when the DLL can't be loaded or the procedure can't be found.
# On error the shellcode should execute an int3 instruction.
def inject_dll(self, dllname, procname = None, lpParameter = 0,
bWait = True, dwTimeout = None):
"""
Injects a DLL into the process memory.
@warning: Setting C{bWait} to C{True} when the process is frozen by a
debug event will cause a deadlock in your debugger.
@warning: This involves allocating memory in the target process.
This is how the freeing of this memory is handled:
- If the C{bWait} flag is set to C{True} the memory will be freed
automatically before returning from this method.
- If the C{bWait} flag is set to C{False}, the memory address is
set as the L{Thread.pInjectedMemory} property of the returned
thread object.
- L{Debug} objects free L{Thread.pInjectedMemory} automatically
both when it detaches from a process and when the injected
thread finishes its execution.
- The {Thread.kill} method also frees L{Thread.pInjectedMemory}
automatically, even if you're not attached to the process.
You could still be leaking memory if not careful. For example, if
you inject a dll into a process you're not attached to, you don't
wait for the thread's completion and you don't kill it either, the
memory would be leaked.
@see: L{inject_code}
@type dllname: str
@param dllname: Name of the DLL module to load.
@type procname: str
@param procname: (Optional) Procedure to call when the DLL is loaded.
@type lpParameter: int
@param lpParameter: (Optional) Parameter to the C{procname} procedure.
@type bWait: bool
@param bWait: C{True} to wait for the process to finish.
C{False} to return immediately.
@type dwTimeout: int
@param dwTimeout: (Optional) Timeout value in milliseconds.
Ignored if C{bWait} is C{False}.
@rtype: L{Thread}
@return: Newly created thread object. If C{bWait} is set to C{True} the
thread will be dead, otherwise it will be alive.
@raise NotImplementedError: The target platform is not supported.
Currently calling a procedure in the library is only supported in
the I{i386} architecture.
@raise WindowsError: An exception is raised on error.
"""
# Resolve kernel32.dll
aModule = self.get_module_by_name('kernel32.dll')
if aModule is None:
self.scan_modules()
aModule = self.get_module_by_name('kernel32.dll')
if aModule is None:
raise RuntimeError(
"Cannot resolve kernel32.dll in the remote process")
# Old method, using shellcode.
if procname:
if self.get_arch() != win32.ARCH_I386:
raise NotImplementedError()
dllname = str(dllname)
# Resolve kernel32.dll!LoadLibraryA
pllib = aModule.resolve('LoadLibraryA')
if not pllib:
raise RuntimeError(
"Cannot resolve kernel32.dll!LoadLibraryA"
" in the remote process")
# Resolve kernel32.dll!GetProcAddress
pgpad = aModule.resolve('GetProcAddress')
if not pgpad:
raise RuntimeError(
"Cannot resolve kernel32.dll!GetProcAddress"
" in the remote process")
# Resolve kernel32.dll!VirtualFree
pvf = aModule.resolve('VirtualFree')
if not pvf:
raise RuntimeError(
"Cannot resolve kernel32.dll!VirtualFree"
" in the remote process")
# Shellcode follows...
code = ''.encode('latin1')
# push dllname
code += '\xe8' + struct.pack('<L', len(dllname) + 1) + dllname + '\0'
# mov eax, LoadLibraryA
code += '\xb8' + struct.pack('<L', pllib)
# call eax
code += '\xff\xd0'
if procname:
# push procname
code += '\xe8' + struct.pack('<L', len(procname) + 1)
code += procname + '\0'
# push eax
code += '\x50'
# mov eax, GetProcAddress
code += '\xb8' + struct.pack('<L', pgpad)
# call eax
code += '\xff\xd0'
# mov ebp, esp ; preserve stack pointer
code += '\x8b\xec'
# push lpParameter
code += '\x68' + struct.pack('<L', lpParameter)
# call eax
code += '\xff\xd0'
# mov esp, ebp ; restore stack pointer
code += '\x8b\xe5'
# pop edx ; our own return address
code += '\x5a'
# push MEM_RELEASE ; dwFreeType
code += '\x68' + struct.pack('<L', win32.MEM_RELEASE)
# push 0x1000 ; dwSize, shellcode max size 4096 bytes
code += '\x68' + struct.pack('<L', 0x1000)
# call $+5
code += '\xe8\x00\x00\x00\x00'
# and dword ptr [esp], 0xFFFFF000 ; align to page boundary
code += '\x81\x24\x24\x00\xf0\xff\xff'
# mov eax, VirtualFree
code += '\xb8' + struct.pack('<L', pvf)
# push edx ; our own return address
code += '\x52'
# jmp eax ; VirtualFree will return to our own return address
code += '\xff\xe0'
# Inject the shellcode.
# There's no need to free the memory,
# because the shellcode will free it itself.
aThread, lpStartAddress = self.inject_code(code, lpParameter)
# New method, not using shellcode.
else:
# Resolve kernel32.dll!LoadLibrary (A/W)
if type(dllname) == type(u''):
pllibname = 'LoadLibraryW'
bufferlen = (len(dllname) + 1) * 2
dllname = win32.ctypes.create_unicode_buffer(dllname).raw[:bufferlen + 1]
else:
pllibname = 'LoadLibraryA'
dllname = str(dllname) + '\x00'
bufferlen = len(dllname)
pllib = aModule.resolve(pllibname)
if not pllib:
msg = "Cannot resolve kernel32.dll!%s in the remote process"
raise RuntimeError(msg % pllibname)
# Copy the library name into the process memory space.
pbuffer = self.malloc(bufferlen)
try:
self.write(pbuffer, dllname)
# Create a new thread to load the library.
try:
aThread = self.start_thread(pllib, pbuffer)
except WindowsError, e:
if e.winerror != win32.ERROR_NOT_ENOUGH_MEMORY:
raise
# This specific error is caused by trying to spawn a new
# thread in a process belonging to a different Terminal
# Services session (for example a service).
raise NotImplementedError(
"Target process belongs to a different"
" Terminal Services session, cannot inject!"
)
# Remember the buffer address.
# It will be freed ONLY by the Thread.kill() method
# and the EventHandler class, otherwise you'll have to
# free it in your code.
aThread.pInjectedMemory = pbuffer
# Free the memory on error.
except Exception:
self.free(pbuffer)
raise
# Wait for the thread to finish.
if bWait:
aThread.wait(dwTimeout)
self.free(aThread.pInjectedMemory)
del aThread.pInjectedMemory
# Return the thread object.
return aThread
def clean_exit(self, dwExitCode = 0, bWait = False, dwTimeout = None):
"""
Injects a new thread to call ExitProcess().
Optionally waits for the injected thread to finish.
@warning: Setting C{bWait} to C{True} when the process is frozen by a
debug event will cause a deadlock in your debugger.
@type dwExitCode: int
@param dwExitCode: Process exit code.
@type bWait: bool
@param bWait: C{True} to wait for the process to finish.
C{False} to return immediately.
@type dwTimeout: int
@param dwTimeout: (Optional) Timeout value in milliseconds.
Ignored if C{bWait} is C{False}.
@raise WindowsError: An exception is raised on error.
"""
if not dwExitCode:
dwExitCode = 0
pExitProcess = self.resolve_label('kernel32!ExitProcess')
aThread = self.start_thread(pExitProcess, dwExitCode)
if bWait:
aThread.wait(dwTimeout)
#------------------------------------------------------------------------------
def _notify_create_process(self, event):
"""
Notify the creation of a new process.
This is done automatically by the L{Debug} class, you shouldn't need
to call it yourself.
@type event: L{CreateProcessEvent}
@param event: Create process event.
@rtype: bool
@return: C{True} to call the user-defined handle, C{False} otherwise.
"""
# Do not use super() here.
bCallHandler = _ThreadContainer._notify_create_process(self, event)
bCallHandler = bCallHandler and \
_ModuleContainer._notify_create_process(self, event)
return bCallHandler
#==============================================================================
class _ProcessContainer (object):
"""
Encapsulates the capability to contain Process objects.
@group Instrumentation:
start_process, argv_to_cmdline, cmdline_to_argv, get_explorer_pid
@group Processes snapshot:
scan, scan_processes, scan_processes_fast,
get_process, get_process_count, get_process_ids,
has_process, iter_processes, iter_process_ids,
find_processes_by_filename, get_pid_from_tid,
get_windows,
scan_process_filenames,
clear, clear_processes, clear_dead_processes,
clear_unattached_processes,
close_process_handles,
close_process_and_thread_handles
@group Threads snapshots:
scan_processes_and_threads,
get_thread, get_thread_count, get_thread_ids,
has_thread
@group Modules snapshots:
scan_modules, find_modules_by_address,
find_modules_by_base, find_modules_by_name,
get_module_count
"""
def __init__(self):
self.__processDict = dict()
def __initialize_snapshot(self):
"""