Skip to content

Commit

Permalink
adding gdt support, closes #1
Browse files Browse the repository at this point in the history
  • Loading branch information
clearbluejar committed Apr 30, 2024
1 parent 7405636 commit b7ef2d0
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 34 deletions.
3 changes: 2 additions & 1 deletion ghidriff/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ def main():
min_func_len=args.min_func_len,
use_calling_counts=args.use_calling_counts,
bsim=args.bsim,
bsim_full=args.bsim_full
bsim_full=args.bsim_full,
gdts=args.gdt
)

d.setup_project(binary_paths, project_path, project_name, symbols_path)
Expand Down
77 changes: 44 additions & 33 deletions ghidriff/ghidra_diff_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def __init__(
min_func_len: int = 10,
use_calling_counts: bool = False,
bsim: bool = True,
bsim_full: bool = False) -> None:
bsim_full: bool = False,
gdts: list = []) -> None:


# setup engine logging
Expand Down Expand Up @@ -158,9 +159,12 @@ def __init__(

# if looking up more than calling_count_funcs_limit symbols, skip function counts
self.use_calling_counts = use_calling_counts

self.bsim = bsim
self.bsim_full = bsim_full

self.gdts = gdts

self.logger.debug(f'{vars(self)}')

@ staticmethod
Expand Down Expand Up @@ -192,7 +196,8 @@ def add_ghidra_args_to_parser(parser: argparse.ArgumentParser) -> None:
group.add_argument('--min-func-len', help='Minimum function length to consider for diff',
type=int, default=10),
group.add_argument('--use-calling-counts', help='Add calling/called reference counts', default=False,
action=argparse.BooleanOptionalAction)
action=argparse.BooleanOptionalAction)
group.add_argument('--gdt',action='append', help='Path to GDT file for analysis', default=[])

group = parser.add_argument_group('BSIM Options')
group.add_argument('--bsim', help='Toggle using BSIM correlation', default=True,
Expand Down Expand Up @@ -736,33 +741,33 @@ def setup_symbol_server(self, symbols_path: Union[str, Path], level=0, server_u

self.logger.info(f'Symbol Server Configured path: {symbolServerService.toString().strip()}')

# def apply_gdt(self, program: "ghidra.program.model.listing.Program", gdt_path: Union[str, Path], verbose: bool = False):
# """
# Apply GDT to program
# """

# from ghidra.app.cmd.function import ApplyFunctionDataTypesCmd
# from ghidra.program.model.symbol import SourceType
# from java.io import File
# from java.util import List
# from ghidra.program.model.data import FileDataTypeManager
# from ghidra.util.task import ConsoleTaskMonitor

# gdt_path = Path(gdt_path)

# if verbose:
# print('Enabling verbose gdt..')
# monitor = ConsoleTaskMonitor()
# else:
# monitor = ConsoleTaskMonitor().DUMMY_MONITOR

# archiveGDT = File(gdt_path)
# archiveDTM = FileDataTypeManager.openFileArchive(archiveGDT, False)
# always_replace = True
# createBookmarksEnabled = True
# cmd = ApplyFunctionDataTypesCmd(List.of(archiveDTM), None, SourceType.USER_DEFINED,
# always_replace, createBookmarksEnabled)
# cmd.applyTo(program, monitor)
def apply_gdt(self, program: "ghidra.program.model.listing.Program", gdt_path: Union[str, Path], verbose: bool = False):
"""
Apply GDT to program
"""

from ghidra.app.cmd.function import ApplyFunctionDataTypesCmd
from ghidra.program.model.symbol import SourceType
from java.io import File
from java.util import List
from ghidra.program.model.data import FileDataTypeManager
from ghidra.util.task import ConsoleTaskMonitor

gdt_path = Path(gdt_path)

if verbose:
print('Enabling verbose gdt..')
monitor = ConsoleTaskMonitor()
else:
monitor = ConsoleTaskMonitor().DUMMY_MONITOR

archiveGDT = File(gdt_path)
archiveDTM = FileDataTypeManager.openFileArchive(archiveGDT, False)
always_replace = True
createBookmarksEnabled = True
cmd = ApplyFunctionDataTypesCmd(List.of(archiveDTM), None, SourceType.USER_DEFINED,
always_replace, createBookmarksEnabled)
cmd.applyTo(program, monitor)

def analyze_program(self, df_or_prog: Union["ghidra.framework.model.DomainFile", "ghidra.program.model.listing.Program"], require_symbols: bool, force_analysis: bool = False, verbose_analysis: bool = False):

Expand All @@ -781,9 +786,15 @@ def analyze_program(self, df_or_prog: Union["ghidra.framework.model.DomainFile",

self.logger.info(f"Analyzing: {program}")

# gdt_names = [name for name in program.getDataTypeManager().getSourceArchives()]
# if len(gdt_names) > 0:
# print(f'Using file gdts: {gdt_names}')
for gdt in self.gdts:
self.logger.info(f"Loading GDT: {gdt}")
if not Path(gdt).exists():
raise FileNotFoundError(f'GDT Path not found {gdt}')
self.apply_gdt(program,gdt)

gdt_names = [name for name in program.getDataTypeManager().getSourceArchives()]
if len(gdt_names) > 0:
print(f'Using file gdts: {gdt_names}')

try:
if verbose_analysis or self.verbose_analysis:
Expand Down Expand Up @@ -814,7 +825,7 @@ def analyze_program(self, df_or_prog: Union["ghidra.framework.model.DomainFile",

# TODO make this argument optional, or provide custom analyzer config parsing
# This really helps with decompilation, was turned off by default in 10.x
# self.set_analysis_option_bool(program, 'Decompiler Parameter ID', True)
self.set_analysis_option_bool(program, 'Decompiler Parameter ID', True)

if self.no_symbols:
self.logger.warn(f'Disabling symbols for analysis! --no-symbols flag: {self.no_symbols}')
Expand Down
98 changes: 98 additions & 0 deletions tests/test_gdt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from pathlib import Path
import json
import pytest

from ghidriff import get_parser, get_engine_classes, VersionTrackingDiff, GhidraDiffEngine

SYMBOLS_DIR = 'symbols'
BINS_DIR = 'bins'


@pytest.mark.forked
def test_gdt_afd(shared_datadir: Path):
"""
Tests application of a GDT to a program
runs forked because each jpype jvm can only be initialized 1x
"""

test_name = 'cve-2023-21768'
output_path = shared_datadir / test_name
output_path.mkdir(exist_ok=True, parents=True)
symbols_path = shared_datadir / SYMBOLS_DIR
bins_path = shared_datadir / BINS_DIR
ghidra_project_path = output_path / 'ghidra_projects'
ghidra_project_path.mkdir(exist_ok=True,parents=True)
gdt_path = (shared_datadir / 'ntddk_64.gdt')

# setup bins
old_bin_path = bins_path / 'afd.sys.x64.10.0.22621.1028'
new_bin_path = bins_path / 'afd.sys.x64.10.0.22621.1415'

assert old_bin_path.exists()
assert new_bin_path.exists()

parser = get_parser()

GhidraDiffEngine.add_ghidra_args_to_parser(parser)

args = parser.parse_args([
'-s',
str(symbols_path),
str(old_bin_path.absolute()),
str(new_bin_path.absolute()),
'-p',
str(ghidra_project_path.absolute()),
"--gdt",
str(gdt_path.absolute())
])

engine_log_path = output_path / parser.get_default('log_path')

binary_paths = args.old + [bin for sublist in args.new for bin in sublist]

binary_paths = [Path(path) for path in binary_paths]

if any([not path.exists() for path in binary_paths]):
missing_bins = [f'{path.name}' for path in binary_paths if not path.exists()]
raise FileNotFoundError(f"Missing Bins: {' '.join(missing_bins)}")

project_name = f'{args.project_name}-{binary_paths[0].name}-{binary_paths[-1].name}'

DiffEngine: GhidraDiffEngine = VersionTrackingDiff

d: GhidraDiffEngine = DiffEngine(args=args,
verbose=True,
threaded=args.threaded,
max_ram_percent=args.max_ram_percent,
print_jvm_flags=args.print_flags,
jvm_args=args.jvm_args,
force_analysis=args.force_analysis,
force_diff=args.force_diff,
verbose_analysis=args.va,
no_symbols=args.no_symbols,
engine_log_path=engine_log_path,
engine_log_level=args.log_level,
engine_file_log_level=args.file_log_level,
gdts=args.gdt
)

d.setup_project(binary_paths, args.project_location, project_name, args.symbols_path)

d.analyze_project()

program = None
for df in d.project.getRootFolder().getFiles():
program = d.project.openProgram("/", df.getName(), False)

## without GDT this func sig return UNDEFINED and not types
known_typed_sig = "BOOLEAN IoIs32bitProcess(PIRP Irp)"
symbol_to_test = "IoIs32bitProcess"

for f in program.functionManager.externalFunctions:

if f'{f.getName()}' == symbol_to_test:
print(f)
signature_after_gdt = f'{f.getSignature()}'

assert signature_after_gdt is not None
assert signature_after_gdt == known_typed_sig

0 comments on commit b7ef2d0

Please sign in to comment.