In [762]:
import tkinter as tk
from tkinter import filedialog
from pathlib import Path

def select_c_file():
    root = tk.Tk()
    root.withdraw()  # Hide the main window

    file_path = filedialog.askopenfilename(
        title="Select Buggy Code file",
        filetypes=[("C Files", "*.c"), ("C++ Files", "*.cpp")]
    )

    file_path = str(Path(file_path).relative_to(Path.cwd()))

    if len(file_path.split(" ")) > 1:
        raise Exception(f"There should be no spaces in the file path: {file_path}")

    if file_path:
        print(f"Selected file: {file_path}")
    else:
        raise Exception("No file selected.")
    
    return file_path

try:
    c_file_path = select_c_file()
except Exception as e:
    print(e)

Selected file: data/bug10.c


# Setting up the LLM

In [72]:
!pip install -qU langchain-openai
!pip install -q langchain-core langgraph

In [478]:
import os
import getpass
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = getpass.getpass(prompt="Enter OpenAI API Key: ")

model = ChatOpenAI(model="gpt-4o-mini")

In [763]:
import re

def remove_comments(code):
    # Remove single-line comments (//...)
    code = re.sub(r'//.*', '', code)
    # Remove multi-line comments (/*...*/)
    code = re.sub(r'/\*.*?\*/', '', code, flags=re.DOTALL)
    return code

# Example usage
def read_and_clean_c_file(file_path):
    with open(file_path, 'r') as file:
        code = file.read()
    
    cleaned_code = remove_comments(code)
    return cleaned_code

In [764]:
c_file = read_and_clean_c_file(c_file_path)

In [765]:
prompt = f"""Write python code that will create an example input for the below code and store this input in folder called input. Please output only code.
{c_file}
"""

In [766]:
print(prompt)

Write python code that will create an example input for the below code and store this input in folder called input. Please output only code.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>



struct Field {
    uint8_t *data;
    uint32_t length;
    int allocated;
};

struct Record {
    struct Field *fields;
    uint32_t field_count;
    int allocated;
};

int main(int argc, char **argv) {
    if (argc < 2) {
        fprintf(stderr, "Usage: %s <inputfile>\n", argv[0]);
        return 1;
    }

    FILE *f = fopen(argv[1], "rb");
    if (!f) {
        fprintf(stderr, "Could not open file %s\n", argv[1]);
        return 1;
    }

    uint32_t record_count = 0;
    if (fread(&record_count, 4, 1, f) < 1) {
        
        fclose(f);
        return 0;
    }

    
    struct Record *records = malloc((size_t)record_count * sizeof(struct Record));
    if (!records && record_count > 0) {
        fclose(f);
        return 0; 
    }

    for (size_t i = 0; i < re

In [767]:
from langchain_core.messages import HumanMessage

response = model.invoke([HumanMessage(content=prompt)])
print(response.content)

```python
import os
import struct

# Create input directory if it doesn't exist
os.makedirs('input', exist_ok=True)

# Example data
records = [
    {
        'field_count': 2,
        'fields': [
            b'Field1 data',
            b'Field2 data'
        ]
    },
    {
        'field_count': 1,
        'fields': [
            b'Field3 data'
        ]
    }
]

# Prepare binary data
record_count = len(records)
data = struct.pack('<I', record_count)  # Record count

for record in records:
    field_count = record['field_count']
    data += struct.pack('<I', field_count)  # Field count
    for field in record['fields']:
        length = len(field)
        data += struct.pack('<I', length)  # Length of the field
        data += field  # Field data

# Write to file
with open('input/example_input.bin', 'wb') as f:
    f.write(data)
```


In [768]:
print(response.content[10:-3])

import os
import struct

# Create input directory if it doesn't exist
os.makedirs('input', exist_ok=True)

# Example data
records = [
    {
        'field_count': 2,
        'fields': [
            b'Field1 data',
            b'Field2 data'
        ]
    },
    {
        'field_count': 1,
        'fields': [
            b'Field3 data'
        ]
    }
]

# Prepare binary data
record_count = len(records)
data = struct.pack('<I', record_count)  # Record count

for record in records:
    field_count = record['field_count']
    data += struct.pack('<I', field_count)  # Field count
    for field in record['fields']:
        length = len(field)
        data += struct.pack('<I', length)  # Length of the field
        data += field  # Field data

# Write to file
with open('input/example_input.bin', 'wb') as f:
    f.write(data)



In [769]:
import shutil

if os.path.isdir("input"):
    shutil.rmtree("input")
    print(f"Deleted folder: input")
else:
    print(f"Folder does not exist: input")

Deleted folder: input


In [770]:
exec(response.content[10:-3])


In [771]:
prompt = f"""{c_file}

Does the above code read its input from a file or from the terminal?
If it reads from the terminal, output "@"
If it reads from a file, output "@@"

ONLY OUTPUT "@" or "@@"
"""

In [772]:
response = model.invoke([HumanMessage(content=prompt)])
print(response.content)

@@


In [773]:
input_type = str(response.content).strip()
print(input_type)

@@


In [774]:
import subprocess

def run_afl_fuzz(file_path, fuzz_time, input_type, sudo_password):

    """Run AFL with the generated input command and check for bugs."""
    with open("error_log.txt", "w") as errorfd:
        result = subprocess.run([
            "bash", "run_afl.sh", file_path, f"{fuzz_time}", input_type, sudo_password
            ], stdout=errorfd, stderr=errorfd)

    if result.returncode == 2:
        raise Exception("AFL Aborted, Check error_log.txt or compilation_log.txt for details")
    else:
        subprocess.run([
            "rm", "error_log.txt"
        ])

    # Analyze AFL output for any crashes
    crash_dir = "output/default/crashes"
    return crash_dir

def run_gdb(file_path, input_type, crash_dir, num_crashes):

    """Run GDB with the generated AFL inputs"""
    result = subprocess.run([
        "bash", "run_gdb.sh", file_path, input_type, crash_dir, f"{num_crashes}"
        ], stdout=subprocess.PIPE, text=True)

    output = result.stdout
    stacktraces = output.split("\n\n")[1:]
    return stacktraces
    
    

In [775]:
def read_crash_inputs(crash_dir):
    crash_strings = []
    
    # Check if the directory exists
    if not os.path.isdir(crash_dir):
        print(f"Directory {crash_dir} does not exist.")
        return crash_strings
    
    # Iterate through files in the crash directory
    for filename in os.listdir(crash_dir):
        if filename == "README.txt":
            continue
        file_path = os.path.join(crash_dir, filename)
        
        # Read the file as binary and decode to a string
        with open(file_path, "rb") as file:
            crash_input = file.read()
            # Decode with errors ignored to handle non-text binary data
            crash_strings.append(crash_input.decode(errors="ignore"))
    
    return crash_strings

In [776]:
langchain = True
gdb = True

if gdb:
    model_prompt = """You are a bug-fixing bot. You will attempt to fix buggy code across multiple iterations.
You will be given Buggy Code and a list of inputs generated by a fuzzer which have caused the program to crash.
Each input in the list causes a unique type of crash.
The gdb stacktrace of the crash will also be provided alongside the input.
Use this information to fix the all the bugs.
Only output the fully fixed code in the form of a string.
If the code you generated is still buggy, you will have to try again in the next iteration.
"""

    query_prompt = f"""
Given to you is Buggy Code and a list of inputs generated by a fuzzer which caused the program to crash.
Each input in the list causes a unique type of crash.
The gdb stacktrace of the crash will also be provided alongside the input.
Use this information to fix the all the bugs.
Only output the fully fixed code in the form of a string.
"""
else:
    model_prompt = """You are a bug-fixing bot. You will attempt to fix buggy code across multiple iterations.
You will be given Buggy Code and a list of inputs generated by a fuzzer which have caused the program to crash.
Each input in the list causes a unique type of crash.
Use this information to fix the all the bugs.
Only output the fully fixed code in the form of a string.
If the code you generated is still buggy, you will have to try again in the next iteration.
"""

    query_prompt = f"""
Given to you is Buggy Code and a list of inputs generated by a fuzzer which caused the program to crash.
Each input in the list causes a unique type of crash.
Use this information to fix the all the bugs.
Only output the fully fixed code in the form of a string.
"""


In [777]:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

prompt_template = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            model_prompt,
        ),
        MessagesPlaceholder(variable_name="messages"),
    ]
)

In [778]:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import START, MessagesState, StateGraph
from langchain_core.messages import trim_messages

trimmer = trim_messages(
    max_tokens=128000,
    strategy="last",
    token_counter=model,
    include_system=True,
    allow_partial=False,
    start_on="human",
)

# Define the function that calls the model
def call_model(state: MessagesState):
    trimmed_messages = trimmer.invoke(state["messages"])
    prompt = prompt_template.invoke(trimmed_messages)
    response = model.invoke(prompt)
    return {"messages": response}

In [779]:
if os.path.isdir("output/default"):
    shutil.rmtree("output/default")

buggy_code = c_file
fuzzer_input_path = c_file_path
file_extension = c_file_path.split(".")[-1]
sudo_password = getpass.getpass(prompt="Enter Sudo Password: ")

# Define a new graph
workflow = StateGraph(state_schema=MessagesState)

# Define the (single) node in the graph
workflow.add_edge(START, "model")
workflow.add_node("model", call_model)

# Add memory
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "memory"}}

fuzz_time = 30
num_crashes = 5
max_iterations = 3
iterations = 0
while True:
    if iterations == 0:
        print(f"MEMORY: {langchain}")
        print(f"GDB Stacktrace: {gdb}")
        print("\nINITIAL BUG CHECK")
    try:
        print(f"FUZZING FOR {fuzz_time} SECONDS...")
        crash_dir = run_afl_fuzz(fuzzer_input_path, fuzz_time, input_type, sudo_password)
    except Exception as e:
        print(e)
        break

    crash_inputs = read_crash_inputs(crash_dir)

    if len(crash_inputs) == 0:
        if iterations == 0:
            print("AFL found no bugs, Try increasing fuzz time.")
        else:
            print(f"Code fixed in {iterations} iterations")
        break
    print("Unique Bugs Found:", len(crash_inputs))

    if iterations == max_iterations:
        print(f"Code was unable to be fixed in {iterations} iterations. Try increasing fuzz time and iterations")
        break

    iterations += 1
    print("\nIteration:", iterations)

    if gdb:
        print(f"RUNNING GDB...")
        stacktraces = run_gdb(fuzzer_input_path, input_type, crash_dir, min(len(crash_inputs), num_crashes))
        gen_inputs = "\n".join(repr(item) for item in crash_inputs[:min(len(crash_inputs), num_crashes)])
        gen_inputs = gen_inputs.split("\n")
        outside_info = [
            f"Fuzzer Generated Input:\n{gen_input}\n\ngdb Stacktrace:\n{stacktrace}\n"
            for gen_input, stacktrace in zip(gen_inputs, stacktraces)
        ]
        info = "\n".join(outside_info)
    else:
        info = "\n".join(repr(item) for item in crash_inputs[:min(len(crash_inputs), num_crashes)])
        info = f"Fuzzer Generated Inputs:\n{info}"

    print(f"LLM FIXING CODE...")
    if langchain:
        query = f"""Iteration: {iterations}

Buggy Code:
{buggy_code}

{info}
"""

        input_messages = [HumanMessage(query)]
        output = app.invoke({"messages": input_messages}, config)
        
        response = output["messages"][-1]
    else:
        query = f"""Buggy Code:
{buggy_code}

{info}

{query_prompt}
"""
    
        response = model.invoke([HumanMessage(content=query)])

    buggy_code = response.content[3+len(file_extension):-3]

    with open(f"fixed_code.{file_extension}", "w") as file:
        file.write(buggy_code)
    
    fuzzer_input_path = f"fixed_code.{file_extension}"
    

MEMORY: True
GDB Stacktrace: True

INITIAL BUG CHECK
FUZZING FOR 30 SECONDS...
Unique Bugs Found: 18

Iteration: 1
RUNNING GDB...
LLM FIXING CODE...
FUZZING FOR 30 SECONDS...
Code fixed in 1 iterations
