Skip to content

Commit

Permalink
changed path to pathlib.Path (#775)
Browse files Browse the repository at this point in the history
added test for `download_file`
  • Loading branch information
hugsy committed Jan 8, 2022
1 parent 32941ad commit d5a3bb6
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 112 deletions.
185 changes: 86 additions & 99 deletions gef.py
Expand Up @@ -1607,20 +1607,11 @@ def gef_pybytes(x):
@lru_cache()
def which(program):
"""Locate a command on the filesystem."""

def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)

fpath = os.path.split(program)[0]
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
for path in os.environ["PATH"].split(os.pathsep):
dirname = pathlib.Path(path)
fpath = dirname / program
if os.access(fpath, os.X_OK):
return fpath

raise FileNotFoundError("Missing file `{:s}`".format(program))

Expand Down Expand Up @@ -1735,14 +1726,10 @@ def disable_redirect_output():

def gef_makedirs(path, mode=0o755):
"""Recursive mkdir() creation. If successful, return the absolute path of the directory created."""
abspath = os.path.expanduser(path)
abspath = os.path.realpath(abspath)

if os.path.isdir(abspath):
return abspath

os.makedirs(abspath, mode=mode, exist_ok=True)
return abspath
fpath = pathlib.Path(path)
if not fpath.is_dir():
fpath.mkdir(mode=mode, exist_ok=True, parents=True)
return fpath.absolute()


@lru_cache()
Expand Down Expand Up @@ -1938,9 +1925,11 @@ def gef_execute_gdb_script(commands):
with os.fdopen(fd, "w") as f:
f.write(commands)
f.flush()
if os.access(fname, os.R_OK):

fname = pathlib.Path(fname)
if fname.is_file() and os.access(fname, os.R_OK):
gdb.execute("source {:s}".format(fname))
os.unlink(fname)
fname.unlink()
return


Expand Down Expand Up @@ -3182,29 +3171,28 @@ def get_filepath():
return get_path_from_info_proc()


def download_file(target, use_cache=False, local_name=None):
"""Download filename `target` inside the mirror tree inside the gef.config["gef.tempdir"].
def download_file(remote_path, use_cache=False, local_name=None):
"""Download filename `remote_path` inside the mirror tree inside the gef.config["gef.tempdir"].
The tree architecture must be gef.config["gef.tempdir"]/gef/<local_pid>/<remote_filepath>.
This allow a "chroot-like" tree format."""

try:
local_root = os.path.sep.join([gef.config["gef.tempdir"], str(gef.session.pid)])
local_root = pathlib.Path(gef.config["gef.tempdir"]) / str(gef.session.pid)
if local_name is None:
local_path = os.path.sep.join([local_root, os.path.dirname(target)])
local_name = os.path.sep.join([local_path, os.path.basename(target)])
local_path = local_root / remote_path.strip(os.sep)
else:
local_path = os.path.sep.join([local_root, os.path.dirname(local_name)])
local_name = os.path.sep.join([local_path, os.path.basename(local_name)])
local_path = local_root / local_name.strip(os.sep)

if use_cache and os.access(local_name, os.R_OK):
return local_name
if use_cache and local_path.exists():
return str(local_path.absolute())

gef_makedirs(local_path)
gdb.execute("remote get {0:s} {1:s}".format(target, local_name))
local_path.parent.mkdir(parents=True, exist_ok=True)
gdb.execute("remote get {0:s} {1:s}".format(remote_path, str(local_path.absolute())))

local_path = str(local_path.absolute())
except gdb.error:
# fallback memory view
with open(local_name, "w") as f:
with open(local_path, "w") as f:
if is_32bit():
f.write("00000000-ffffffff rwxp 00000000 00:00 0 {}\n".format(get_filepath()))
else:
Expand All @@ -3213,7 +3201,8 @@ def download_file(target, use_cache=False, local_name=None):
except Exception as e:
err("download_file() failed: {}".format(str(e)))
local_name = None
return local_name

return local_path


def get_function_length(sym):
Expand Down Expand Up @@ -4580,9 +4569,9 @@ class VersionCommand(GenericCommand):
_example_ = "{:s}".format(_cmdline_)

def do_invoke(self, argv):
gef_fpath = os.path.abspath(os.path.expanduser(inspect.stack()[0][1]))
gef_dir = os.path.dirname(gef_fpath)
with open(gef_fpath, "rb") as f:
gef_fpath = pathlib.Path(inspect.stack()[0][1]).expanduser().absolute()
gef_dir = gef_fpath.parent
with gef_fpath.open("rb") as f:
gef_hash = hashlib.sha256(f.read()).hexdigest()

if os.access("{}/.git".format(gef_dir), os.X_OK):
Expand Down Expand Up @@ -5801,8 +5790,8 @@ def import_structures(self, structs):
gef_makedirs(path)

for struct_name in structs:
fullpath = os.path.join(path, "{}.py".format(struct_name))
with open(fullpath, "w") as f:
fullpath = pathlib.Path(path) / "{}.py".format(struct_name)
with fullpath.open("w") as f:
f.write("from ctypes import *\n\n")
f.write("class ")
f.write(struct_name)
Expand Down Expand Up @@ -7730,7 +7719,7 @@ class ProcessListingCommand(GenericCommand):

def __init__(self):
super().__init__(complete=gdb.COMPLETE_LOCATION)
self["ps_command"] = ( "{:s} auxww".format(gef.session.constants["ps"]), "`ps` command to get process information")
self["ps_command"] = ( "{:s} auxww".format(str(gef.session.constants["ps"])), "`ps` command to get process information")
return

@parse_arguments({"pattern": ""}, {"--attach": True, "--smart-scan": True})
Expand Down Expand Up @@ -11533,65 +11522,63 @@ def reset_caches(self):
"Consider updating to GDB {} or higher (with Python {} or higher).".format(".".join(map(str, GDB_MIN_VERSION)), ".".join(map(str, PYTHON_MIN_VERSION))))
exit(1)

else:
try:
pyenv = which("pyenv")
PYENV_ROOT = gef_pystring(subprocess.check_output([pyenv, "root"]).strip())
PYENV_VERSION = gef_pystring(subprocess.check_output([pyenv, "version-name"]).strip())
site_packages_dir = os.path.join(PYENV_ROOT, "versions", PYENV_VERSION, "lib",
"python{}".format(PYENV_VERSION[:3]), "site-packages")
site.addsitedir(site_packages_dir)
except FileNotFoundError:
pass

# When using a Python virtual environment, GDB still loads the system-installed Python
# so GEF doesn't load site-packages dir from environment
# In order to fix it, from the shell with venv activated we run the python binary,
# take and parse its path, add the path to the current python process using sys.path.extend
PYTHONBIN = which("python3")
PREFIX = gef_pystring(subprocess.check_output([PYTHONBIN, '-c', 'import os, sys;print((sys.prefix))'])).strip("\\n")
if PREFIX != sys.base_prefix:
SITE_PACKAGES_DIRS = subprocess.check_output(
[PYTHONBIN, "-c", "import os, sys;print(os.linesep.join(sys.path).strip())"]).decode("utf-8").split()
sys.path.extend(SITE_PACKAGES_DIRS)

# setup prompt
gdb.prompt_hook = __gef_prompt__

# setup config
gdb_initial_settings = (
"set confirm off",
"set verbose off",
"set pagination off",
"set print elements 0",
"set history save on",
"set history filename ~/.gdb_history",
"set output-radix 0x10",
"set print pretty on",
"set disassembly-flavor intel",
"handle SIGALRM print nopass",
)
for cmd in gdb_initial_settings:
try:
pyenv = which("pyenv")
PYENV_ROOT = gef_pystring(subprocess.check_output([pyenv, "root"]).strip())
PYENV_VERSION = gef_pystring(subprocess.check_output([pyenv, "version-name"]).strip())
site_packages_dir = os.path.join(PYENV_ROOT, "versions", PYENV_VERSION, "lib",
"python{}".format(PYENV_VERSION[:3]), "site-packages")
site.addsitedir(site_packages_dir)
except FileNotFoundError:
gdb.execute(cmd)
except gdb.error:
pass

# When using a Python virtual environment, GDB still loads the system-installed Python
# so GEF doesn't load site-packages dir from environment
# In order to fix it, from the shell with venv activated we run the python binary,
# take and parse its path, add the path to the current python process using sys.path.extend

PYTHONBIN = which("python3")
PREFIX = gef_pystring(subprocess.check_output([PYTHONBIN, '-c', 'import os, sys;print((sys.prefix))'])).strip("\\n")
if PREFIX != sys.base_prefix:
SITE_PACKAGES_DIRS = subprocess.check_output(
[PYTHONBIN, "-c", "import os, sys;print(os.linesep.join(sys.path).strip())"]).decode("utf-8").split()
sys.path.extend(SITE_PACKAGES_DIRS)

# setup prompt
gdb.prompt_hook = __gef_prompt__

# setup config
gdb_initial_settings = (
"set confirm off",
"set verbose off",
"set pagination off",
"set print elements 0",
"set history save on",
"set history filename ~/.gdb_history",
"set output-radix 0x10",
"set print pretty on",
"set disassembly-flavor intel",
"handle SIGALRM print nopass",
)
for cmd in gdb_initial_settings:
try:
gdb.execute(cmd)
except gdb.error:
pass

# load GEF
reset()
# load GEF
reset()

# gdb events configuration
gef_on_continue_hook(continue_handler)
gef_on_stop_hook(hook_stop_handler)
gef_on_new_hook(new_objfile_handler)
gef_on_exit_hook(exit_handler)
gef_on_memchanged_hook(memchanged_handler)
gef_on_regchanged_hook(regchanged_handler)
# gdb events configuration
gef_on_continue_hook(continue_handler)
gef_on_stop_hook(hook_stop_handler)
gef_on_new_hook(new_objfile_handler)
gef_on_exit_hook(exit_handler)
gef_on_memchanged_hook(memchanged_handler)
gef_on_regchanged_hook(regchanged_handler)

if gdb.current_progspace().filename is not None:
# if here, we are sourcing gef from a gdb session already attached
# we must force a call to the new_objfile handler (see issue #278)
new_objfile_handler(None)
if gdb.current_progspace().filename is not None:
# if here, we are sourcing gef from a gdb session already attached
# we must force a call to the new_objfile handler (see issue #278)
new_objfile_handler(None)

GefTmuxSetup()
GefTmuxSetup()
13 changes: 13 additions & 0 deletions tests/helpers.py
Expand Up @@ -159,3 +159,16 @@ def _target(name: str, extension: str = ".out") -> Path:
if not target.exists():
raise FileNotFoundError(f"Could not find file '{target}'")
return target


def start_gdbserver(exe=_target("default"), port=1234):
return subprocess.Popen(["gdbserver", f":{port}", exe],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

def stop_gdbserver(gdbserver):
"""Stops the gdbserver and waits until it is terminated if it was
still running. Needed to make the used port available again."""
if gdbserver.poll() is None:
gdbserver.kill()
gdbserver.wait()
return
23 changes: 10 additions & 13 deletions tests/runtests.py
Expand Up @@ -22,7 +22,8 @@
include_for_architectures,
ARCH,
is_64b,
_target
_target,
start_gdbserver, stop_gdbserver,
)

BIN_LS = Path("/bin/ls")
Expand Down Expand Up @@ -197,18 +198,6 @@ def test_cmd_got(self):
return

def test_cmd_gef_remote(self):
def start_gdbserver(exe=_target("default"), port=1234):
return subprocess.Popen(["gdbserver", f":{port}", exe],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

def stop_gdbserver(gdbserver):
"""Stops the gdbserver and waits until it is terminated if it was
still running. Needed to make the used port available again."""
if gdbserver.poll() is None:
gdbserver.kill()
gdbserver.wait()
return

before = ["gef-remote :1234"]
gdbserver = start_gdbserver()
res = gdb_start_silent_cmd("vmmap", before=before)
Expand Down Expand Up @@ -900,6 +889,14 @@ def test_func_parse_address(self):
self.assertException(res)
return

def test_func_download_file(self):
gdbsrv = start_gdbserver(BIN_LS)
func = f"download_file('{str(BIN_LS)}')"
res = gdb_test_python_method(func)
stop_gdbserver(gdbsrv)
self.assertNoException(res)
return


class TestGdbFunctionsUnit(GefUnitTestGeneric):
"""Tests gdb convenience functions added by GEF."""
Expand Down

0 comments on commit d5a3bb6

Please sign in to comment.