Skip to content

Commit

Permalink
Remove ida-interact (#817)
Browse files Browse the repository at this point in the history
* remove `ida-interact` from gef, replaced by a better version in `gef-extras`

* Fix double loading of external plugins (#816)

`register_external_command` was receiving an instance of a class for each new external script. This lead to a double initialization when calling `gef.gdb.load(cls)`. Fixed by registering directly a class (just like `register_command`)

* [lint] removed `xmlrpclib` unused import

* restored doctstring of `IdaInteractCommand`

* restoring python path insertion for python plugins for extra pacakges
  • Loading branch information
hugsy committed Feb 13, 2022
1 parent d86e7a0 commit 82b2570
Showing 1 changed file with 4 additions and 232 deletions.
236 changes: 4 additions & 232 deletions gef.py
Expand Up @@ -78,7 +78,6 @@
import time
import traceback
import warnings
import xmlrpc.client as xmlrpclib
from functools import lru_cache
from io import StringIO, TextIOWrapper
from types import ModuleType
Expand Down Expand Up @@ -3452,11 +3451,6 @@ def is_hex(pattern: str) -> bool:
return len(pattern) % 2 == 0 and all(c in string.hexdigits for c in pattern[2:])


def ida_synchronize_handler(_: "gdb.Event") -> None:
gdb.execute("ida-interact sync", from_tty=True)
return


def continue_handler(_: "gdb.Event") -> None:
"""GDB event handler for new object continue cases."""
return
Expand Down Expand Up @@ -5735,234 +5729,12 @@ def get_fd_from_result(self, res: str) -> int:

@register_command
class IdaInteractCommand(GenericCommand):
"""IDA Interact: set of commands to interact with IDA via a XML RPC service
deployed via the IDA script `ida_gef.py`. It should be noted that this command
can also be used to interact with Binary Ninja (using the script `binja_gef.py`)
using the same interface."""

"""**REMOVED** a better version of `ida-interact` is now hosted on `gef-extras`"""
_cmdline_ = "ida-interact"
_syntax_ = f"{_cmdline_} METHOD [ARGS]"
_aliases_ = ["binaryninja-interact", "bn", "binja"]
_example_ = f"\n{_cmdline_} Jump $pc\n{_cmdline_} SetColor $pc ff00ff"

def __init__(self) -> None:
super().__init__(prefix=False)
self["host"] = ("127.0.0.1", "IP address to use connect to IDA/Binary Ninja script")
self["port"] = (1337, "Port to use connect to IDA/Binary Ninja script")
self["sync_cursor"] = (False, "Enable real-time $pc synchronization")

self.sock = None
self.version = ("", "")
self.old_bps = set()
return

def is_target_alive(self, host: str, port: int) -> bool:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
s.connect((host, port))
s.close()
except OSError:
return False
return True

def connect(self, host: Optional[str] = None, port: Optional[int] = None) -> None:
"""Connect to the XML-RPC service."""
host = host or self["host"]
port = port or self["port"]

try:
sock = xmlrpclib.ServerProxy(f"http://{host}:{port:d}")
gef_on_stop_hook(ida_synchronize_handler)
gef_on_continue_hook(ida_synchronize_handler)
self.version = sock.version()
except ConnectionRefusedError:
err(f"Failed to connect to '{host}:{port:d}'")
sock = None
self.sock = sock
return

def disconnect(self) -> None:
gef_on_stop_unhook(ida_synchronize_handler)
gef_on_continue_unhook(ida_synchronize_handler)
self.sock = None
return

@deprecated("")
def do_invoke(self, argv: List[str]) -> None:
def parsed_arglist(arglist: List[str]) -> List[str]:
args = []
for arg in arglist:
try:
# try to solve the argument using gdb
argval = gdb.parse_and_eval(arg)
argval.fetch_lazy()
# check if value is addressable
argval = int(argval) if argval.address is None else int(argval.address)
# if the bin is PIE, we need to subtract the base address
if is_pie(get_filepath()) and main_base_address <= argval < main_end_address:
argval -= main_base_address
args.append(f"{argval:#x}")
except Exception:
# if gdb can't parse the value, let ida deal with it
args.append(arg)
return args

if self.sock is None:
# trying to reconnect
self.connect()
if self.sock is None:
self.disconnect()
return

if len(argv) == 0 or argv[0] in ("-h", "--help"):
method_name = argv[1] if len(argv) > 1 else None
self.usage(method_name)
return

method_name = argv[0].lower()
if method_name == "version":
self.version = self.sock.version()
info(f"Enhancing {Color.greenify('gef')} with {Color.redify(self.version[0])} "
f"(SDK {Color.yellowify(self.version[1])})")
return

if not is_alive():
main_base_address = main_end_address = 0
else:
vmmap = gef.memory.maps
main_base_address = min([x.page_start for x in vmmap if x.realpath == get_filepath()])
main_end_address = max([x.page_end for x in vmmap if x.realpath == get_filepath()])

try:
if method_name == "sync":
self.synchronize()
else:
method = getattr(self.sock, method_name)
if len(argv) > 1:
args = parsed_arglist(argv[1:])
res = method(*args)
else:
res = method()

if method_name == "importstruct":
self.import_structures(res)
else:
gef_print(str(res))

if self["sync_cursor"] is True:
jump = getattr(self.sock, "jump")
jump(hex(gef.arch.pc-main_base_address),)

except OSError:
self.disconnect()
return

def synchronize(self) -> None:
"""Submit all active breakpoint addresses to IDA/BN."""
pc = gef.arch.pc
vmmap = gef.memory.maps
base_address = min([x.page_start for x in vmmap if x.path == get_filepath()])
end_address = max([x.page_end for x in vmmap if x.path == get_filepath()])
if not (base_address <= pc < end_address):
# do not sync in library
return

breakpoints = gdb.breakpoints() or []
gdb_bps = set()
for bp in breakpoints:
if bp.enabled and not bp.temporary:
if bp.location[0] == "*": # if it's an address i.e. location starts with "*"
addr = parse_address(bp.location[1:])
else: # it is a symbol
addr = int(gdb.parse_and_eval(bp.location).address)
if not (base_address <= addr < end_address):
continue
gdb_bps.add(addr - base_address)

added = gdb_bps - self.old_bps
removed = self.old_bps - gdb_bps
self.old_bps = gdb_bps

try:
# it is possible that the server was stopped between now and the last sync
rc = self.sock.sync(f"{pc-base_address:#x}", list(added), list(removed))
except ConnectionRefusedError:
self.disconnect()
return

ida_added, ida_removed = rc

# add new bp from IDA
for new_bp in ida_added:
location = base_address + new_bp
gdb.Breakpoint(f"*{location:#x}", type=gdb.BP_BREAKPOINT)
self.old_bps.add(location)

# and remove the old ones
breakpoints = gdb.breakpoints() or []
for bp in breakpoints:
if bp.enabled and not bp.temporary:
if bp.location[0] == "*": # if it's an address i.e. location starts with "*"
addr = parse_address(bp.location[1:])
else: # it is a symbol
addr = int(gdb.parse_and_eval(bp.location).address)

if not (base_address <= addr < end_address):
continue

if (addr - base_address) in ida_removed:
if (addr - base_address) in self.old_bps:
self.old_bps.remove((addr - base_address))
bp.delete()
return

def usage(self, meth: Optional[str] = None) -> None:
if self.sock is None:
return

if meth is not None:
gef_print(titlify(meth))
gef_print(self.sock.system.methodHelp(meth))
return

info("Listing available methods and syntax examples: ")
for m in self.sock.system.listMethods():
if m.startswith("system."): continue
gef_print(titlify(m))
gef_print(self.sock.system.methodHelp(m))
return

def import_structures(self, structs: Dict[str, List[Tuple[int, str, int]]]) -> None:
if self.version[0] != "IDA Pro":
return

path = gef.config["pcustom.struct_path"]
if path is None:
return

if not os.path.isdir(path):
gef_makedirs(path)

for struct_name in structs:
fullpath = pathlib.Path(path) / f"{struct_name}.py"
with fullpath.open("w") as f:
f.write("from ctypes import *\n\n")
f.write("class ")
f.write(struct_name)
f.write("(Structure):\n")
f.write(" _fields_ = [\n")
for _, name, size in structs[struct_name]:
name = bytes(name, encoding="utf-8")
if size == 1: csize = "c_uint8"
elif size == 2: csize = "c_uint16"
elif size == 4: csize = "c_uint32"
elif size == 8: csize = "c_uint64"
else: csize = f"c_byte * {size}"
m = f' (\"{name}\", {csize}),\n'
f.write(m)
f.write("]\n")
ok(f"Success, {len(structs):d} structure{'s' if len(structs) > 1 else ''} imported")
err("`ida-interact` was removed from gef. You can find a better version as part of `gef-extras`")
return


Expand Down Expand Up @@ -10545,8 +10317,8 @@ def __load_extra_plugins(self) -> int:
if directories:
for directory in directories.split(";"):
directory = pathlib.Path(directory).expanduser()
if not directory.is_dir():
continue
if not directory.is_dir(): continue
sys.path.append(str(directory))
for entry in directory.iterdir():
if not entry.is_file(): continue
if entry.suffix != ".py": continue
Expand Down

0 comments on commit 82b2570

Please sign in to comment.