In [None]:
import pefile
import sys

def file_offset_to_virtual_address(pe_file_path, file_offset):
    try:
        # Load the PE file
        pe = pefile.PE(pe_file_path)

        # Get the base address, section alignment, and file alignment
        base_address = pe.OPTIONAL_HEADER.ImageBase
        section_alignment = pe.OPTIONAL_HEADER.SectionAlignment
        file_alignment = pe.OPTIONAL_HEADER.FileAlignment

        # Iterate through sections to find the one containing the file offset
        for section in pe.sections:
            # Get section details
            raw_data_offset = section.PointerToRawData
            raw_data_size = section.SizeOfRawData
            virtual_address = section.VirtualAddress

            # Check if the file offset falls within the section's raw data range
            if raw_data_offset <= file_offset < raw_data_offset + raw_data_size:
                # Calculate the offset within the section
                offset_within_section = file_offset - raw_data_offset

                # Calculate the virtual address
                final_address = base_address + virtual_address + offset_within_section
                print(f"Let's compare! PEFile says {hex(pe.get_rva_from_offset(file_offset))}... I say {hex(final_address)}")
                return final_address

        # If no section contains the file offset, return None
        return None

    except Exception as e:
        print(f"Error processing the PE file: {e}")
        return None


def virtual_address_to_file_offset(pe_file_path, virtual_address):
    try:
        # Load the PE file
        pe = pefile.PE(pe_file_path)

        # Get the base address, section alignment, and file alignment
        base_address = pe.OPTIONAL_HEADER.ImageBase
        section_alignment = pe.OPTIONAL_HEADER.SectionAlignment
        file_alignment = pe.OPTIONAL_HEADER.FileAlignment

        # Calculate the relative virtual address (RVA)
        rva = virtual_address - base_address

        # Iterate through sections to find the one containing the RVA
        for section in pe.sections:
            # Get section details
            raw_data_offset = section.PointerToRawData
            raw_data_size = section.SizeOfRawData
            virtual_address_start = section.VirtualAddress
            virtual_address_end = virtual_address_start + section.Misc_VirtualSize

            # Check if the RVA falls within the section's virtual address range
            if virtual_address_start <= rva < virtual_address_end:
                # Calculate the offset within the section
                offset_within_section = rva - virtual_address_start

                # Calculate the file offset
                file_offset = raw_data_offset + offset_within_section
                print(f"Let's compare! PEFile says {pe.get_offset_from_rva(rva)}... I say {file_offset}")
                return file_offset

        # If no section contains the RVA, return None
        return None

    except Exception as e:
        print(f"Error processing the PE file: {e}")
        return None




In [None]:
executables = {
    "anon_call2_vs_stripped": "../sunbench25/benchmark/indirect_calls/anonymous_functions/anon_call2_vs-stripped.exe",
    "anon_call3_vs_stripped": "../sunbench25/benchmark/indirect_calls/anonymous_functions/anon_call3_vs-stripped.exe",
    "virtual7_vs_stripped": "../sunbench25/benchmark/indirect_calls/virtual_tables/virtual7_vs-stripped.exe",
    "anon_jump1_vs_stripped": "../sunbench25/benchmark/indirect_jumps/anonymous_functions/anon_jump1_vs-stripped.exe",
    "anon_jump1_vs_2_stripped": "../sunbench25/benchmark/indirect_jumps/anonymous_functions/anon_jump1_vs_2-stripped.exe",
    "switch4_vs_stripped": "../sunbench25/benchmark/indirect_jumps/switch_statements/switch4_vs-stripped.exe",
    "switch5_vs_stripped": "../sunbench25/benchmark/indirect_jumps/switch_statements/switch5_vs-stripped.exe",
    "switch6_vs_stripped": "../sunbench25/benchmark/indirect_jumps/switch_statements/switch6_vs-stripped.exe",
}

cfr_mapping = {
    path: path.replace(".exe", "-cfr.json")
    for key, path in executables.items()
}


In [None]:
import json
import re
for exepath, cfrpath in cfr_mapping.items():
    with open(cfrpath, 'r') as f:
        cfr = json.load(f)
    question = cfr['question']
    match = re.search(r"'([^']*)'(?!.*')", question)
    offset = match.group(1)
    print(f"{exepath}: {offset}")
    file_offset_to_virtual_address(exepath, int(offset,16))

In [None]:
import json
import re
import pefile
import os
from typing import List, Dict
from dataclasses import dataclass

@dataclass
class Program:
    exe_path: str
    cfr_path: str
    question: str
    file_offset: int        # offset inside file (from the question)
    rva: int                # runtime RVA (for breakpoints)
    ground_truth: List[int] # list of file offsets from CFR JSON

# original flat mapping
executables: Dict[str, str] = {
    "anon_call2_vs_stripped":    "../sunbench25/benchmark/indirect_calls/anonymous_functions/anon_call2_vs-stripped.exe",
    "anon_call3_vs_stripped":    "../sunbench25/benchmark/indirect_calls/anonymous_functions/anon_call3_vs-stripped.exe",
    "virtual7_vs_stripped":      "../sunbench25/benchmark/indirect_calls/virtual_tables/virtual7_vs-stripped.exe",
    "anon_jump1_vs_stripped":    "../sunbench25/benchmark/indirect_jumps/anonymous_functions/anon_jump1_vs-stripped.exe",
    "anon_jump1_vs_2_stripped":  "../sunbench25/benchmark/indirect_jumps/anonymous_functions/anon_jump1_vs_2-stripped.exe",
    "switch4_vs_stripped":       "../sunbench25/benchmark/indirect_jumps/switch_statements/switch4_vs-stripped.exe",
    "switch5_vs_stripped":       "../sunbench25/benchmark/indirect_jumps/switch_statements/switch5_vs-stripped.exe",
    "switch6_vs_stripped":       "../sunbench25/benchmark/indirect_jumps/switch_statements/switch6_vs-stripped.exe",
}

programs: Dict[str, Program] = {}

for name, exe_path in executables.items():
    # derive the CFR JSON path
    cfr_path = exe_path.replace(".exe", "-cfr.json")

    # load the question, extract the quoted offset, and ground truth
    with open(cfr_path, 'r') as f:
        cfr = json.load(f)

    question = cfr.get('question', '')
    m = re.search(r"'([^']*)'(?!.*')", question)
    if not m:
        raise ValueError(f"Could not parse offset from question for {name!r}")
    file_offset = int(m.group(1), 16)

    # parse ground truth offsets
    gt_strs = cfr.get('groundtruth', [])
    ground_truth = [int(off, 16) for off in gt_strs]

    # compute the RVA from file offset
    pe = pefile.PE(exe_path)
    rva = pe.get_rva_from_offset(file_offset)

    # store everything in our nice dict
    programs[name] = Program(
        exe_path    = exe_path,
        cfr_path    = cfr_path,
        question    = question,
        file_offset = file_offset,
        rva         = rva,
        ground_truth= ground_truth
    )


In [50]:
import subprocess
import threading
import queue
import time
import os
import logging
from typing import Optional, Callable, List, Dict

# Configure root logger (can be customized by users of this module)
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)

class ProcessTerminationIntercepted(Exception):
    """Raised when debugger hits NtTerminateProcess — process is about to terminate."""
    pass

class CDBWrapper:
    """
    A Python wrapper for Windows CDB debugger that allows real-time interaction.
    Integrates Python logging for comprehensive debugging.
    """
    def __init__(self, cdb_path: str = None):
        """
        Initialize the CDB wrapper.
        
        Args:
            cdb_path: Path to cdb.exe. If None, assumes cdb.exe is in PATH.
        """
        self.cdb_path = cdb_path or "cdb.exe"
        self.process: Optional[subprocess.Popen] = None
        self.output_queue = queue.Queue()
        self.output_thread: Optional[threading.Thread] = None
        self.error_thread: Optional[threading.Thread] = None
        self.running = False
        self.output_callback: Optional[Callable[[str], None]] = None
        self.target_exe = None
        logger.debug("Initialized CDBWrapper with cdb_path=%s", self.cdb_path)
        
    def start(self, target_exe: str, args: List[str] = None, 
            output_callback: Callable[[str], None] = None) -> bool:
        """
        Start the debugger with a target executable.
        
        Args:
            target_exe: Path to the executable to debug
            args: Command line arguments for the target executable
            output_callback: Optional callback function for output processing
            
        Returns:
            True if debugger started successfully, False otherwise
        """
        if self.running:
            logger.warning("Debugger is already running")
            return False
            
        self.output_callback = output_callback
        self.target_exe = target_exe
        logger.debug("Starting debugger for '%s' with args=%s", target_exe, args)
        
        # Build command
        cmd = [self.cdb_path]
        
        # Add common options
        cmd.extend([
            "-lines",  # Load line number information
            "-srcpath", os.path.dirname(target_exe),  # Source path
            target_exe
        ])
        
        if args:
            cmd.extend(args)
        
        try:
            # Start the debugger process
            self.process = subprocess.Popen(
                cmd,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
                bufsize=0  # Unbuffered for real-time interaction
            )
            
            self.running = True
            
            # Start output reader threads
            self.output_thread = threading.Thread(
                target=self._read_output,
                args=(self.process.stdout,)
            )
            self.error_thread = threading.Thread(
                target=self._read_output,
                args=(self.process.stderr,)
            )
            
            self.output_thread.daemon = True
            self.error_thread.daemon = True
            
            self.output_thread.start()
            self.error_thread.start()
            logger.info("Debugger started, PID=%s", self.process.pid)
            return True
            
        except Exception as e:
            logger.exception("Failed to start debugger")
            return False
            
    def attach(self, pid: int, output_callback: Callable[[str], None] = None) -> bool:
        """
        Attach to a running process.
        
        Args:
            pid: Process ID to attach to
            output_callback: Optional callback function for output processing
            
        Returns:
            True if attached successfully, False otherwise
        """
        if self.running:
            logger.warning("Debugger is already running")
            return False
        
        self.output_callback = output_callback
        logger.debug("Attaching debugger to PID=%d", pid)
        cmd = [self.cdb_path, "-p", str(pid)]
        
        try:
            self.process = subprocess.Popen(
                cmd,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
                bufsize=0
            )
            
            self.running = True
            
            self.output_thread = threading.Thread(
                target=self._read_output,
                args=(self.process.stdout,)
            )
            self.error_thread = threading.Thread(
                target=self._read_output,
                args=(self.process.stderr,)
            )
            
            self.output_thread.daemon = True
            self.error_thread.daemon = True
            
            self.output_thread.start()
            self.error_thread.start()
            logger.info("Attached to process, PID=%s", pid)
            return True
            
        except Exception as e:
            logger.exception("Failed to attach debugger")
            return False
            
    def send_command(self, command: str) -> None:
        """
        Send a command to the debugger.
        
        Args:
            command: Debugger command to execute
        """
        if not self.running or not self.process:
            logger.error("Cannot send command, debugger not running")
            return
            
        try:
            logger.debug("Sending command: %s", command)
            self.process.stdin.write(command + "\n")
            self.process.stdin.flush()
        except Exception as e:
            logger.exception("Failed to send command")
            
    def get_output(self, timeout: float = 0.1) -> Optional[str]:
        """
        Get output from the debugger.
        
        Args:
            timeout: Timeout in seconds for waiting for output
        
        Returns:
            Output string if available, None otherwise
        """
        try:
            line = self.output_queue.get(timeout=timeout)
            logger.debug("Received output: %s", line.strip())
            return line
        except queue.Empty:
            return None
            
    def wait_for_prompt(self, timeout: float = 15.0) -> List[str]:
        start = time.time()
        buffer = ""
        lines = []

        while time.time() - start < timeout:
            line = self.get_output(timeout=0.1)
            if line:
                lines.append(line)
                buffer += line
                if buffer.rstrip().endswith(">"):
                    return lines

        logger.warning("Timeout waiting for debugger prompt")
        return lines

        
    def _read_output(self, stream) -> None:
        """
        Read output from a stream and put it in the queue.
        """
        while self.running:
            try:
                line = stream.readline()
                if line:
                    self.output_queue.put(line)
                    if self.output_callback:
                        self.output_callback(line)
                else:
                    break
            except Exception:
                logger.exception("Error reading debugger output")
                break
                
    def stop(self) -> None:
        """
        Stop the debugger.
        """
        if not self.running:
            return
            
        logger.info("Stopping debugger")
        self.running = False
        
        if self.process:
            try:
                # Send quit command
                self.send_command("q")
                time.sleep(0.5)
                
                # Terminate if still running
                if self.process.poll() is None:
                    self.process.terminate()
                    self.process.wait(timeout=5)
            except Exception as e:
                logger.exception("Error during debugger shutdown")

    def setargs(self, args: List[str]) -> bool:
        """
        Invokes .create with the original program and specified args.
        Returns success or failure as boolean
        """
        if not self.running:
            logger.error("Debugger is not running")
            return False
        
        arg_str = ""
        for arg in args:
            arg_str += f'"{arg}" '

        self.send_command(f'.create {self.target_exe} {arg_str}')
        self.wait_for_prompt()
        return True
    
    def restart(self) -> None:
        """
        Restarts the process with .restart
        """
        self.send_command(".restart")
        self.wait_for_prompt()

    def lm(self) -> List[str]:
        """
        List all loaded modules.

        Returns:
            A list of module names.
        """
        if not self.running:
            logger.error("Cannot list modules, debugger not running")
            return []
        logger.debug("Listing modules with 'lm' command")
        # Send lm command
        self.send_command('lm')
        # Wait for debugger prompt
        raw = self.wait_for_prompt(timeout=5)
        modules = []
        for line in raw:
            if "(deferred)" not in line: # this is a tragic attempt really...
                continue
            parts = line.strip().split()
            logger.debug(f"Obtained parts {parts} from line {line!r}")
            if parts:
                modules.append(parts[-2])
        logger.debug(f"Found modules: {modules}")
        return modules


    def bp(self, address: str) -> bool:
        """
        Set a breakpoint at the given address.

        Args:
            address: The address (RVA or absolute) where to set the breakpoint.

        Returns:
            True if the command was sent, False otherwise.
        """
        if not self.running:
            logger.error("Cannot set breakpoint, debugger not running")
            return False
        logger.debug("Setting breakpoint at %s", address)
        self.send_command(f"bp {address}")
        self.wait_for_prompt()
        return True

    def g(self) -> None:
        """
        Continue program execution until the next breakpoint or exit.
        """
        if not self.running:
            return
        logger.debug("Continuing execution ('g')")
        self.send_command('g')
        lines = self.wait_for_prompt()
        for line in lines:
            if "ntdll!NtTerminateProcess" in line:
                raise ProcessTerminationIntercepted("Arrived at ntdll!NtTerminateProcess when running \"go\"")
        regs = self.r()
        logger.debug(f"Now at RIP = {regs['rip']}")

        self.wait_for_prompt()

    def t(self) -> None:
        """
        Trace one instruction (single-step).
        """
        if not self.running:
            return
        logger.debug("Single-step instruction ('t')")
        self.send_command('t')
        self.wait_for_prompt()

    def r(self) -> Dict[str, str]:
        """
        Get current register values.

        Returns:
            A dict mapping register names to their hex values.
        """
        if not self.running:
            logger.error("Cannot read registers, debugger not running")
            return {}
        logger.debug("Reading registers ('r')")
        self.send_command('r')
        self.wait_for_prompt()
        regs: Dict[str, str] = {}
        buffer = ''
        while True:
            line = self.get_output(timeout=0.1)
            if line is None:
                break
            buffer += line.strip() + ' '
        # Parse tokens like 'rax=000000...'
        for token in buffer.split():
            if '=' in token:
                name, val = token.split('=', 1)
                regs[name] = val
        return regs
        
    def __enter__(self):
        """Context manager support."""
        return self
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager cleanup."""
        self.stop()


In [54]:
import os
name = "switch6_vs_stripped"
program = programs[name]
program.exe_path = os.path.abspath(program.exe_path)
print(f"{name}:")
print(f"  exe         = {program.exe_path}")
print(f"  cfr         = {program.cfr_path}")
print(f"  question    = {program.question!r}")
print(f"  file_offset = {hex(program.file_offset)}")
print(f"  rva         = {hex(program.rva)}")
print(f"  groundtruth = {[hex(x) for x in program.ground_truth]}\n")

cdb = CDBWrapper()
print(f"Attempting to run {program.exe_path}")
args = []
args_len = 0
success = cdb.start(program.exe_path)
modules = cdb.lm()
module = modules[0]
while args_len < 5:
    cdb.bp(f"{module}+{hex(program.rva)}")
    try:
        cdb.g()
    except ProcessTerminationIntercepted:
        args_len += 1
        args.append(f"{args_len}")
        cdb.setargs(args)
        cdb.restart()
        continue
    cdb.t()
    cdb.r()
    args_len += 1
    args.append(f"{args_len}")
    cdb.setargs(args)
    cdb.restart()
cdb.stop()

2025-08-05 19:47:02 [DEBUG] __main__: Initialized CDBWrapper with cdb_path=cdb.exe
2025-08-05 19:47:02 [DEBUG] __main__: Starting debugger for 'c:\Users\eknoc\Documents\my\suns-dataset\control_flow_recovery\sunbench25\benchmark\indirect_jumps\switch_statements\switch6_vs-stripped.exe' with args=None
2025-08-05 19:47:02 [INFO] __main__: Debugger started, PID=19028
2025-08-05 19:47:02 [DEBUG] __main__: Listing modules with 'lm' command
2025-08-05 19:47:02 [DEBUG] __main__: Sending command: lm
2025-08-05 19:47:02 [DEBUG] __main__: Received output: 
2025-08-05 19:47:02 [DEBUG] __main__: Received output: ************* Preparing the environment for Debugger Extensions Gallery repositories **************
2025-08-05 19:47:02 [DEBUG] __main__: Received output: ExtensionRepository : Implicit
2025-08-05 19:47:02 [DEBUG] __main__: Received output: UseExperimentalFeatureForNugetShare : true
2025-08-05 19:47:02 [DEBUG] __main__: Received output: AllowNugetExeUpdate : true
2025-08-05 19:47:02 [DEBUG]

switch6_vs_stripped:
  exe         = c:\Users\eknoc\Documents\my\suns-dataset\control_flow_recovery\sunbench25\benchmark\indirect_jumps\switch_statements\switch6_vs-stripped.exe
  cfr         = ../sunbench25/benchmark/indirect_jumps/switch_statements/switch6_vs-stripped-cfr.json
  question    = "What are the file offsets for the instructions that are the targets of the 'jmp rax' instruction at file offset '0x6577' ?"
  file_offset = 0x6577
  rva         = 0x7177
  groundtruth = ['0x657b', '0x6584', '0x658d', '0x6596', '0x659f', '0x65a8', '0x65b1', '0x65ba', '0x65c3', '0x65cc', '0x65d5', '0x65de', '0x65e5']

Attempting to run c:\Users\eknoc\Documents\my\suns-dataset\control_flow_recovery\sunbench25\benchmark\indirect_jumps\switch_statements\switch6_vs-stripped.exe


2025-08-05 19:47:02 [DEBUG] __main__: Received output: >>>>>>>>>>>>> Waiting for Debugger Extensions Gallery to Initialize completed, duration 0.032 seconds
2025-08-05 19:47:02 [DEBUG] __main__: Received output: ----> Repository : UserExtensions, Enabled: true, Packages count: 0
2025-08-05 19:47:02 [DEBUG] __main__: Received output: ----> Repository : LocalInstalled, Enabled: true, Packages count: 29
2025-08-05 19:47:02 [DEBUG] __main__: Received output: 
2025-08-05 19:47:02 [DEBUG] __main__: Received output: Microsoft (R) Windows Debugger Version 10.0.26100.4188 AMD64
2025-08-05 19:47:02 [DEBUG] __main__: Received output: Copyright (c) Microsoft Corporation. All rights reserved.
2025-08-05 19:47:02 [DEBUG] __main__: Received output: 
2025-08-05 19:47:02 [DEBUG] __main__: Received output: CommandLine: c:\Users\eknoc\Documents\my\suns-dataset\control_flow_recovery\sunbench25\benchmark\indirect_jumps\switch_statements\switch6_vs-stripped.exe
2025-08-05 19:47:03 [DEBUG] __main__: Received

KeyboardInterrupt: 